You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

620 lines
16 KiB

11 years ago
11 years ago
11 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
11 years ago
10 years ago
11 years ago
10 years ago
11 years ago
11 years ago
11 years ago
11 years ago
11 years ago
10 years ago
11 years ago
10 years ago
10 years ago
10 years ago
11 years ago
11 years ago
11 years ago
11 years ago
11 years ago
11 years ago
10 years ago
11 years ago
10 years ago
10 years ago
10 years ago
11 years ago
10 years ago
11 years ago
11 years ago
11 years ago
11 years ago
11 years ago
11 years ago
11 years ago
11 years ago
11 years ago
11 years ago
10 years ago
10 years ago
11 years ago
11 years ago
11 years ago
11 years ago
11 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
  1. package p2p
  2. import (
  3. "bufio"
  4. "fmt"
  5. "io"
  6. "math"
  7. "net"
  8. "runtime/debug"
  9. "sync/atomic"
  10. "time"
  11. flow "code.google.com/p/mxk/go1/flowcontrol"
  12. //"github.com/tendermint/log15"
  13. "github.com/tendermint/tendermint/binary"
  14. . "github.com/tendermint/tendermint/common"
  15. )
  16. const (
  17. numBatchMsgPackets = 10
  18. minReadBufferSize = 1024
  19. minWriteBufferSize = 1024
  20. flushThrottleMS = 50
  21. idleTimeoutMinutes = 5
  22. updateStatsSeconds = 2
  23. pingTimeoutMinutes = 2
  24. defaultSendRate = 51200 // 5Kb/s
  25. defaultRecvRate = 51200 // 5Kb/s
  26. defaultSendQueueCapacity = 1
  27. defaultRecvBufferCapacity = 4096
  28. defaultSendTimeoutSeconds = 10
  29. )
  30. type receiveCbFunc func(chId byte, msgBytes []byte)
  31. type errorCbFunc func(interface{})
  32. /*
  33. Each peer has one `MConnection` (multiplex connection) instance.
  34. __multiplex__ *noun* a system or signal involving simultaneous transmission of
  35. several messages along a single channel of communication.
  36. Each `MConnection` handles message transmission on multiple abstract communication
  37. `Channel`s. Each channel has a globally unique byte id.
  38. The byte id and the relative priorities of each `Channel` are configured upon
  39. initialization of the connection.
  40. There are two methods for sending messages:
  41. func (m MConnection) Send(chId byte, msg interface{}) bool {}
  42. func (m MConnection) TrySend(chId byte, msg interface{}) bool {}
  43. `Send(chId, msg)` is a blocking call that waits until `msg` is successfully queued
  44. for the channel with the given id byte `chId`, or until the request times out.
  45. The message `msg` is serialized using the `tendermint/binary` submodule's
  46. `WriteBinary()` reflection routine.
  47. `TrySend(chId, msg)` is a nonblocking call that returns false if the channel's
  48. queue is full.
  49. Inbound message bytes are handled with an onReceive callback function.
  50. */
  51. type MConnection struct {
  52. conn net.Conn
  53. bufReader *bufio.Reader
  54. bufWriter *bufio.Writer
  55. sendMonitor *flow.Monitor
  56. recvMonitor *flow.Monitor
  57. sendRate int64
  58. recvRate int64
  59. flushTimer *ThrottleTimer // flush writes as necessary but throttled.
  60. send chan struct{}
  61. quit chan struct{}
  62. pingTimer *RepeatTimer // send pings periodically
  63. pong chan struct{}
  64. chStatsTimer *RepeatTimer // update channel stats periodically
  65. channels []*Channel
  66. channelsIdx map[byte]*Channel
  67. onReceive receiveCbFunc
  68. onError errorCbFunc
  69. started uint32
  70. stopped uint32
  71. errored uint32
  72. LocalAddress *NetAddress
  73. RemoteAddress *NetAddress
  74. }
  75. func NewMConnection(conn net.Conn, chDescs []*ChannelDescriptor, onReceive receiveCbFunc, onError errorCbFunc) *MConnection {
  76. mconn := &MConnection{
  77. conn: conn,
  78. bufReader: bufio.NewReaderSize(conn, minReadBufferSize),
  79. bufWriter: bufio.NewWriterSize(conn, minWriteBufferSize),
  80. sendMonitor: flow.New(0, 0),
  81. recvMonitor: flow.New(0, 0),
  82. sendRate: defaultSendRate,
  83. recvRate: defaultRecvRate,
  84. flushTimer: NewThrottleTimer("flush", flushThrottleMS*time.Millisecond),
  85. send: make(chan struct{}, 1),
  86. quit: make(chan struct{}),
  87. pingTimer: NewRepeatTimer("ping", pingTimeoutMinutes*time.Minute),
  88. pong: make(chan struct{}),
  89. chStatsTimer: NewRepeatTimer("chStats", updateStatsSeconds*time.Second),
  90. onReceive: onReceive,
  91. onError: onError,
  92. LocalAddress: NewNetAddress(conn.LocalAddr()),
  93. RemoteAddress: NewNetAddress(conn.RemoteAddr()),
  94. }
  95. // Create channels
  96. var channelsIdx = map[byte]*Channel{}
  97. var channels = []*Channel{}
  98. for _, desc := range chDescs {
  99. channel := newChannel(mconn, desc)
  100. channelsIdx[channel.id] = channel
  101. channels = append(channels, channel)
  102. }
  103. mconn.channels = channels
  104. mconn.channelsIdx = channelsIdx
  105. return mconn
  106. }
  107. // .Start() begins multiplexing packets to and from "channels".
  108. func (c *MConnection) Start() {
  109. if atomic.CompareAndSwapUint32(&c.started, 0, 1) {
  110. log.Debug("Starting MConnection", "connection", c)
  111. go c.sendRoutine()
  112. go c.recvRoutine()
  113. }
  114. }
  115. func (c *MConnection) Stop() {
  116. if atomic.CompareAndSwapUint32(&c.stopped, 0, 1) {
  117. log.Debug("Stopping MConnection", "connection", c)
  118. close(c.quit)
  119. c.conn.Close()
  120. c.flushTimer.Stop()
  121. c.chStatsTimer.Stop()
  122. c.pingTimer.Stop()
  123. // We can't close pong safely here because
  124. // recvRoutine may write to it after we've stopped.
  125. // Though it doesn't need to get closed at all,
  126. // we close it @ recvRoutine.
  127. // close(c.pong)
  128. }
  129. }
  130. func (c *MConnection) String() string {
  131. return fmt.Sprintf("MConn{%v}", c.conn.RemoteAddr())
  132. }
  133. func (c *MConnection) flush() {
  134. err := c.bufWriter.Flush()
  135. if err != nil {
  136. log.Warn("MConnection flush failed", "error", err)
  137. }
  138. }
  139. // Catch panics, usually caused by remote disconnects.
  140. func (c *MConnection) _recover() {
  141. if r := recover(); r != nil {
  142. stack := debug.Stack()
  143. err := StackError{r, stack}
  144. c.stopForError(err)
  145. }
  146. }
  147. func (c *MConnection) stopForError(r interface{}) {
  148. c.Stop()
  149. if atomic.CompareAndSwapUint32(&c.errored, 0, 1) {
  150. if c.onError != nil {
  151. c.onError(r)
  152. }
  153. }
  154. }
  155. // Queues a message to be sent to channel.
  156. func (c *MConnection) Send(chId byte, msg interface{}) bool {
  157. if atomic.LoadUint32(&c.stopped) == 1 {
  158. return false
  159. }
  160. log.Debug("Send", "channel", chId, "connection", c, "msg", msg) //, "bytes", binary.BinaryBytes(msg))
  161. // Send message to channel.
  162. channel, ok := c.channelsIdx[chId]
  163. if !ok {
  164. log.Error(Fmt("Cannot send bytes, unknown channel %X", chId))
  165. return false
  166. }
  167. success := channel.sendBytes(binary.BinaryBytes(msg))
  168. if success {
  169. // Wake up sendRoutine if necessary
  170. select {
  171. case c.send <- struct{}{}:
  172. default:
  173. }
  174. } else {
  175. log.Warn("Send failed", "channel", chId, "connection", c, "msg", msg)
  176. }
  177. return success
  178. }
  179. // Queues a message to be sent to channel.
  180. // Nonblocking, returns true if successful.
  181. func (c *MConnection) TrySend(chId byte, msg interface{}) bool {
  182. if atomic.LoadUint32(&c.stopped) == 1 {
  183. return false
  184. }
  185. log.Debug("TrySend", "channel", chId, "connection", c, "msg", msg)
  186. // Send message to channel.
  187. channel, ok := c.channelsIdx[chId]
  188. if !ok {
  189. log.Error(Fmt("Cannot send bytes, unknown channel %X", chId))
  190. return false
  191. }
  192. ok = channel.trySendBytes(binary.BinaryBytes(msg))
  193. if ok {
  194. // Wake up sendRoutine if necessary
  195. select {
  196. case c.send <- struct{}{}:
  197. default:
  198. }
  199. }
  200. return ok
  201. }
  202. func (c *MConnection) CanSend(chId byte) bool {
  203. if atomic.LoadUint32(&c.stopped) == 1 {
  204. return false
  205. }
  206. channel, ok := c.channelsIdx[chId]
  207. if !ok {
  208. log.Error(Fmt("Unknown channel %X", chId))
  209. return false
  210. }
  211. return channel.canSend()
  212. }
  213. // sendRoutine polls for packets to send from channels.
  214. func (c *MConnection) sendRoutine() {
  215. defer c._recover()
  216. FOR_LOOP:
  217. for {
  218. var n int64
  219. var err error
  220. select {
  221. case <-c.flushTimer.Ch:
  222. // NOTE: flushTimer.Set() must be called every time
  223. // something is written to .bufWriter.
  224. c.flush()
  225. case <-c.chStatsTimer.Ch:
  226. for _, channel := range c.channels {
  227. channel.updateStats()
  228. }
  229. case <-c.pingTimer.Ch:
  230. log.Debug("Send Ping")
  231. binary.WriteByte(packetTypePing, c.bufWriter, &n, &err)
  232. c.sendMonitor.Update(int(n))
  233. c.flush()
  234. case <-c.pong:
  235. log.Debug("Send Pong")
  236. binary.WriteByte(packetTypePong, c.bufWriter, &n, &err)
  237. c.sendMonitor.Update(int(n))
  238. c.flush()
  239. case <-c.quit:
  240. break FOR_LOOP
  241. case <-c.send:
  242. // Send some msgPackets
  243. eof := c.sendSomeMsgPackets()
  244. if !eof {
  245. // Keep sendRoutine awake.
  246. select {
  247. case c.send <- struct{}{}:
  248. default:
  249. }
  250. }
  251. }
  252. if atomic.LoadUint32(&c.stopped) == 1 {
  253. break FOR_LOOP
  254. }
  255. if err != nil {
  256. log.Warn("Connection failed @ sendRoutine", "connection", c, "error", err)
  257. c.stopForError(err)
  258. break FOR_LOOP
  259. }
  260. }
  261. // Cleanup
  262. }
  263. // Returns true if messages from channels were exhausted.
  264. // Blocks in accordance to .sendMonitor throttling.
  265. func (c *MConnection) sendSomeMsgPackets() bool {
  266. // Block until .sendMonitor says we can write.
  267. // Once we're ready we send more than we asked for,
  268. // but amortized it should even out.
  269. c.sendMonitor.Limit(maxMsgPacketSize, atomic.LoadInt64(&c.sendRate), true)
  270. // Now send some msgPackets.
  271. for i := 0; i < numBatchMsgPackets; i++ {
  272. if c.sendMsgPacket() {
  273. return true
  274. }
  275. }
  276. return false
  277. }
  278. // Returns true if messages from channels were exhausted.
  279. func (c *MConnection) sendMsgPacket() bool {
  280. // Choose a channel to create a msgPacket from.
  281. // The chosen channel will be the one whose recentlySent/priority is the least.
  282. var leastRatio float32 = math.MaxFloat32
  283. var leastChannel *Channel
  284. for _, channel := range c.channels {
  285. // If nothing to send, skip this channel
  286. if !channel.isSendPending() {
  287. continue
  288. }
  289. // Get ratio, and keep track of lowest ratio.
  290. ratio := float32(channel.recentlySent) / float32(channel.priority)
  291. if ratio < leastRatio {
  292. leastRatio = ratio
  293. leastChannel = channel
  294. }
  295. }
  296. // Nothing to send?
  297. if leastChannel == nil {
  298. return true
  299. } else {
  300. // log.Debug("Found a msgPacket to send")
  301. }
  302. // Make & send a msgPacket from this channel
  303. n, err := leastChannel.writeMsgPacketTo(c.bufWriter)
  304. if err != nil {
  305. log.Warn("Failed to write msgPacket", "error", err)
  306. c.stopForError(err)
  307. return true
  308. }
  309. c.sendMonitor.Update(int(n))
  310. c.flushTimer.Set()
  311. return false
  312. }
  313. // recvRoutine reads msgPackets and reconstructs the message using the channels' "recving" buffer.
  314. // After a whole message has been assembled, it's pushed to onReceive().
  315. // Blocks depending on how the connection is throttled.
  316. func (c *MConnection) recvRoutine() {
  317. defer c._recover()
  318. FOR_LOOP:
  319. for {
  320. // Block until .recvMonitor says we can read.
  321. c.recvMonitor.Limit(maxMsgPacketSize, atomic.LoadInt64(&c.recvRate), true)
  322. /*
  323. // Peek into bufReader for debugging
  324. if numBytes := c.bufReader.Buffered(); numBytes > 0 {
  325. log.Debug("Peek connection buffer", "numBytes", numBytes, "bytes", log15.Lazy{func() []byte {
  326. bytes, err := c.bufReader.Peek(MinInt(numBytes, 100))
  327. if err == nil {
  328. return bytes
  329. } else {
  330. log.Warn("Error peeking connection buffer", "error", err)
  331. return nil
  332. }
  333. }})
  334. }
  335. */
  336. // Read packet type
  337. var n int64
  338. var err error
  339. pktType := binary.ReadByte(c.bufReader, &n, &err)
  340. c.recvMonitor.Update(int(n))
  341. if err != nil {
  342. if atomic.LoadUint32(&c.stopped) != 1 {
  343. log.Warn("Connection failed @ recvRoutine (reading byte)", "connection", c, "error", err)
  344. c.stopForError(err)
  345. }
  346. break FOR_LOOP
  347. }
  348. // Read more depending on packet type.
  349. switch pktType {
  350. case packetTypePing:
  351. // TODO: prevent abuse, as they cause flush()'s.
  352. log.Debug("Receive Ping")
  353. c.pong <- struct{}{}
  354. case packetTypePong:
  355. // do nothing
  356. log.Debug("Receive Pong")
  357. case packetTypeMsg:
  358. pkt, n, err := msgPacket{}, new(int64), new(error)
  359. binary.ReadBinary(&pkt, c.bufReader, n, err)
  360. c.recvMonitor.Update(int(*n))
  361. if *err != nil {
  362. if atomic.LoadUint32(&c.stopped) != 1 {
  363. log.Warn("Connection failed @ recvRoutine", "connection", c, "error", *err)
  364. c.stopForError(*err)
  365. }
  366. break FOR_LOOP
  367. }
  368. channel, ok := c.channelsIdx[pkt.ChannelId]
  369. if !ok || channel == nil {
  370. panic(Fmt("Unknown channel %X", pkt.ChannelId))
  371. }
  372. msgBytes := channel.recvMsgPacket(pkt)
  373. if msgBytes != nil {
  374. //log.Debug("Received bytes", "chId", pkt.ChannelId, "msgBytes", msgBytes)
  375. c.onReceive(pkt.ChannelId, msgBytes)
  376. }
  377. default:
  378. panic(Fmt("Unknown message type %X", pktType))
  379. }
  380. // TODO: shouldn't this go in the sendRoutine?
  381. // Better to send a ping packet when *we* haven't sent anything for a while.
  382. c.pingTimer.Reset()
  383. }
  384. // Cleanup
  385. close(c.pong)
  386. for _ = range c.pong {
  387. // Drain
  388. }
  389. }
  390. //-----------------------------------------------------------------------------
  391. type ChannelDescriptor struct {
  392. Id byte
  393. Priority uint
  394. SendQueueCapacity uint
  395. RecvBufferCapacity uint
  396. }
  397. func (chDesc *ChannelDescriptor) FillDefaults() {
  398. if chDesc.SendQueueCapacity == 0 {
  399. chDesc.SendQueueCapacity = defaultSendQueueCapacity
  400. }
  401. if chDesc.RecvBufferCapacity == 0 {
  402. chDesc.RecvBufferCapacity = defaultRecvBufferCapacity
  403. }
  404. }
  405. // TODO: lowercase.
  406. // NOTE: not goroutine-safe.
  407. type Channel struct {
  408. conn *MConnection
  409. desc *ChannelDescriptor
  410. id byte
  411. sendQueue chan []byte
  412. sendQueueSize uint32 // atomic.
  413. recving []byte
  414. sending []byte
  415. priority uint
  416. recentlySent int64 // exponential moving average
  417. }
  418. func newChannel(conn *MConnection, desc *ChannelDescriptor) *Channel {
  419. desc.FillDefaults()
  420. if desc.Priority <= 0 {
  421. panic("Channel default priority must be a postive integer")
  422. }
  423. return &Channel{
  424. conn: conn,
  425. desc: desc,
  426. id: desc.Id,
  427. sendQueue: make(chan []byte, desc.SendQueueCapacity),
  428. recving: make([]byte, 0, desc.RecvBufferCapacity),
  429. priority: desc.Priority,
  430. }
  431. }
  432. // Queues message to send to this channel.
  433. // Goroutine-safe
  434. // Times out (and returns false) after defaultSendTimeoutSeconds
  435. func (ch *Channel) sendBytes(bytes []byte) bool {
  436. timeout := time.NewTimer(defaultSendTimeoutSeconds * time.Second)
  437. select {
  438. case <-timeout.C:
  439. // timeout
  440. return false
  441. case ch.sendQueue <- bytes:
  442. atomic.AddUint32(&ch.sendQueueSize, 1)
  443. return true
  444. }
  445. }
  446. // Queues message to send to this channel.
  447. // Nonblocking, returns true if successful.
  448. // Goroutine-safe
  449. func (ch *Channel) trySendBytes(bytes []byte) bool {
  450. select {
  451. case ch.sendQueue <- bytes:
  452. atomic.AddUint32(&ch.sendQueueSize, 1)
  453. return true
  454. default:
  455. return false
  456. }
  457. }
  458. // Goroutine-safe
  459. func (ch *Channel) loadSendQueueSize() (size int) {
  460. return int(atomic.LoadUint32(&ch.sendQueueSize))
  461. }
  462. // Goroutine-safe
  463. // Use only as a heuristic.
  464. func (ch *Channel) canSend() bool {
  465. return ch.loadSendQueueSize() < defaultSendQueueCapacity
  466. }
  467. // Returns true if any msgPackets are pending to be sent.
  468. // Call before calling nextMsgPacket()
  469. // Goroutine-safe
  470. func (ch *Channel) isSendPending() bool {
  471. if len(ch.sending) == 0 {
  472. if len(ch.sendQueue) == 0 {
  473. return false
  474. }
  475. ch.sending = <-ch.sendQueue
  476. }
  477. return true
  478. }
  479. // Creates a new msgPacket to send.
  480. // Not goroutine-safe
  481. func (ch *Channel) nextMsgPacket() msgPacket {
  482. packet := msgPacket{}
  483. packet.ChannelId = byte(ch.id)
  484. packet.Bytes = ch.sending[:MinInt(maxMsgPacketSize, len(ch.sending))]
  485. if len(ch.sending) <= maxMsgPacketSize {
  486. packet.EOF = byte(0x01)
  487. ch.sending = nil
  488. atomic.AddUint32(&ch.sendQueueSize, ^uint32(0)) // decrement sendQueueSize
  489. } else {
  490. packet.EOF = byte(0x00)
  491. ch.sending = ch.sending[MinInt(maxMsgPacketSize, len(ch.sending)):]
  492. }
  493. return packet
  494. }
  495. // Writes next msgPacket to w.
  496. // Not goroutine-safe
  497. func (ch *Channel) writeMsgPacketTo(w io.Writer) (n int64, err error) {
  498. packet := ch.nextMsgPacket()
  499. binary.WriteByte(packetTypeMsg, w, &n, &err)
  500. binary.WriteBinary(packet, w, &n, &err)
  501. if err != nil {
  502. ch.recentlySent += n
  503. }
  504. return
  505. }
  506. // Handles incoming msgPackets. Returns a msg bytes if msg is complete.
  507. // Not goroutine-safe
  508. func (ch *Channel) recvMsgPacket(pkt msgPacket) []byte {
  509. ch.recving = append(ch.recving, pkt.Bytes...)
  510. if pkt.EOF == byte(0x01) {
  511. msgBytes := ch.recving
  512. ch.recving = make([]byte, 0, defaultRecvBufferCapacity)
  513. return msgBytes
  514. }
  515. return nil
  516. }
  517. // Call this periodically to update stats for throttling purposes.
  518. // Not goroutine-safe
  519. func (ch *Channel) updateStats() {
  520. // Exponential decay of stats.
  521. // TODO: optimize.
  522. ch.recentlySent = int64(float64(ch.recentlySent) * 0.5)
  523. }
  524. //-----------------------------------------------------------------------------
  525. const (
  526. maxMsgPacketSize = 1024
  527. packetTypePing = byte(0x01)
  528. packetTypePong = byte(0x02)
  529. packetTypeMsg = byte(0x03)
  530. )
  531. // Messages in channels are chopped into smaller msgPackets for multiplexing.
  532. type msgPacket struct {
  533. ChannelId byte
  534. EOF byte // 1 means message ends here.
  535. Bytes []byte
  536. }
  537. func (p msgPacket) String() string {
  538. return fmt.Sprintf("MsgPacket{%X:%X}", p.ChannelId, p.Bytes)
  539. }
  540. //-----------------------------------------------------------------------------
  541. // Convenience struct for writing typed messages.
  542. // Reading requires a custom decoder that switches on the first type byte of a byteslice.
  543. type TypedMessage struct {
  544. Type byte
  545. Msg interface{}
  546. }
  547. func (tm TypedMessage) String() string {
  548. return fmt.Sprintf("TMsg{%X:%v}", tm.Type, tm.Msg)
  549. }