You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

619 lines
16 KiB

10 years ago
10 years ago
10 years ago
9 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
  1. package p2p
  2. import (
  3. "bufio"
  4. "fmt"
  5. "io"
  6. "math"
  7. "net"
  8. "runtime/debug"
  9. "sync/atomic"
  10. "time"
  11. flow "github.com/tendermint/tendermint/Godeps/_workspace/src/code.google.com/p/mxk/go1/flowcontrol"
  12. "github.com/tendermint/tendermint/binary" //"github.com/tendermint/log15"
  13. . "github.com/tendermint/tendermint/common"
  14. )
  15. const (
  16. numBatchMsgPackets = 10
  17. minReadBufferSize = 1024
  18. minWriteBufferSize = 1024
  19. flushThrottleMS = 50
  20. idleTimeoutMinutes = 5
  21. updateStatsSeconds = 2
  22. pingTimeoutMinutes = 2
  23. defaultSendRate = 51200 // 5Kb/s
  24. defaultRecvRate = 51200 // 5Kb/s
  25. defaultSendQueueCapacity = 1
  26. defaultRecvBufferCapacity = 4096
  27. defaultSendTimeoutSeconds = 10
  28. )
  29. type receiveCbFunc func(chId byte, msgBytes []byte)
  30. type errorCbFunc func(interface{})
  31. /*
  32. Each peer has one `MConnection` (multiplex connection) instance.
  33. __multiplex__ *noun* a system or signal involving simultaneous transmission of
  34. several messages along a single channel of communication.
  35. Each `MConnection` handles message transmission on multiple abstract communication
  36. `Channel`s. Each channel has a globally unique byte id.
  37. The byte id and the relative priorities of each `Channel` are configured upon
  38. initialization of the connection.
  39. There are two methods for sending messages:
  40. func (m MConnection) Send(chId byte, msg interface{}) bool {}
  41. func (m MConnection) TrySend(chId byte, msg interface{}) bool {}
  42. `Send(chId, msg)` is a blocking call that waits until `msg` is successfully queued
  43. for the channel with the given id byte `chId`, or until the request times out.
  44. The message `msg` is serialized using the `tendermint/binary` submodule's
  45. `WriteBinary()` reflection routine.
  46. `TrySend(chId, msg)` is a nonblocking call that returns false if the channel's
  47. queue is full.
  48. Inbound message bytes are handled with an onReceive callback function.
  49. */
  50. type MConnection struct {
  51. conn net.Conn
  52. bufReader *bufio.Reader
  53. bufWriter *bufio.Writer
  54. sendMonitor *flow.Monitor
  55. recvMonitor *flow.Monitor
  56. sendRate int64
  57. recvRate int64
  58. flushTimer *ThrottleTimer // flush writes as necessary but throttled.
  59. send chan struct{}
  60. quit chan struct{}
  61. pingTimer *RepeatTimer // send pings periodically
  62. pong chan struct{}
  63. chStatsTimer *RepeatTimer // update channel stats periodically
  64. channels []*Channel
  65. channelsIdx map[byte]*Channel
  66. onReceive receiveCbFunc
  67. onError errorCbFunc
  68. started uint32
  69. stopped uint32
  70. errored uint32
  71. LocalAddress *NetAddress
  72. RemoteAddress *NetAddress
  73. }
  74. func NewMConnection(conn net.Conn, chDescs []*ChannelDescriptor, onReceive receiveCbFunc, onError errorCbFunc) *MConnection {
  75. mconn := &MConnection{
  76. conn: conn,
  77. bufReader: bufio.NewReaderSize(conn, minReadBufferSize),
  78. bufWriter: bufio.NewWriterSize(conn, minWriteBufferSize),
  79. sendMonitor: flow.New(0, 0),
  80. recvMonitor: flow.New(0, 0),
  81. sendRate: defaultSendRate,
  82. recvRate: defaultRecvRate,
  83. flushTimer: NewThrottleTimer("flush", flushThrottleMS*time.Millisecond),
  84. send: make(chan struct{}, 1),
  85. quit: make(chan struct{}),
  86. pingTimer: NewRepeatTimer("ping", pingTimeoutMinutes*time.Minute),
  87. pong: make(chan struct{}),
  88. chStatsTimer: NewRepeatTimer("chStats", updateStatsSeconds*time.Second),
  89. onReceive: onReceive,
  90. onError: onError,
  91. LocalAddress: NewNetAddress(conn.LocalAddr()),
  92. RemoteAddress: NewNetAddress(conn.RemoteAddr()),
  93. }
  94. // Create channels
  95. var channelsIdx = map[byte]*Channel{}
  96. var channels = []*Channel{}
  97. for _, desc := range chDescs {
  98. channel := newChannel(mconn, desc)
  99. channelsIdx[channel.id] = channel
  100. channels = append(channels, channel)
  101. }
  102. mconn.channels = channels
  103. mconn.channelsIdx = channelsIdx
  104. return mconn
  105. }
  106. // .Start() begins multiplexing packets to and from "channels".
  107. func (c *MConnection) Start() {
  108. if atomic.CompareAndSwapUint32(&c.started, 0, 1) {
  109. log.Debug("Starting MConnection", "connection", c)
  110. go c.sendRoutine()
  111. go c.recvRoutine()
  112. }
  113. }
  114. func (c *MConnection) Stop() {
  115. if atomic.CompareAndSwapUint32(&c.stopped, 0, 1) {
  116. log.Debug("Stopping MConnection", "connection", c)
  117. close(c.quit)
  118. c.conn.Close()
  119. c.flushTimer.Stop()
  120. c.chStatsTimer.Stop()
  121. c.pingTimer.Stop()
  122. // We can't close pong safely here because
  123. // recvRoutine may write to it after we've stopped.
  124. // Though it doesn't need to get closed at all,
  125. // we close it @ recvRoutine.
  126. // close(c.pong)
  127. }
  128. }
  129. func (c *MConnection) String() string {
  130. return fmt.Sprintf("MConn{%v}", c.conn.RemoteAddr())
  131. }
  132. func (c *MConnection) flush() {
  133. err := c.bufWriter.Flush()
  134. if err != nil {
  135. log.Warn("MConnection flush failed", "error", err)
  136. }
  137. }
  138. // Catch panics, usually caused by remote disconnects.
  139. func (c *MConnection) _recover() {
  140. if r := recover(); r != nil {
  141. stack := debug.Stack()
  142. err := StackError{r, stack}
  143. c.stopForError(err)
  144. }
  145. }
  146. func (c *MConnection) stopForError(r interface{}) {
  147. c.Stop()
  148. if atomic.CompareAndSwapUint32(&c.errored, 0, 1) {
  149. if c.onError != nil {
  150. c.onError(r)
  151. }
  152. }
  153. }
  154. // Queues a message to be sent to channel.
  155. func (c *MConnection) Send(chId byte, msg interface{}) bool {
  156. if atomic.LoadUint32(&c.stopped) == 1 {
  157. return false
  158. }
  159. log.Debug("Send", "channel", chId, "connection", c, "msg", msg) //, "bytes", binary.BinaryBytes(msg))
  160. // Send message to channel.
  161. channel, ok := c.channelsIdx[chId]
  162. if !ok {
  163. log.Error(Fmt("Cannot send bytes, unknown channel %X", chId))
  164. return false
  165. }
  166. success := channel.sendBytes(binary.BinaryBytes(msg))
  167. if success {
  168. // Wake up sendRoutine if necessary
  169. select {
  170. case c.send <- struct{}{}:
  171. default:
  172. }
  173. } else {
  174. log.Warn("Send failed", "channel", chId, "connection", c, "msg", msg)
  175. }
  176. return success
  177. }
  178. // Queues a message to be sent to channel.
  179. // Nonblocking, returns true if successful.
  180. func (c *MConnection) TrySend(chId byte, msg interface{}) bool {
  181. if atomic.LoadUint32(&c.stopped) == 1 {
  182. return false
  183. }
  184. log.Debug("TrySend", "channel", chId, "connection", c, "msg", msg)
  185. // Send message to channel.
  186. channel, ok := c.channelsIdx[chId]
  187. if !ok {
  188. log.Error(Fmt("Cannot send bytes, unknown channel %X", chId))
  189. return false
  190. }
  191. ok = channel.trySendBytes(binary.BinaryBytes(msg))
  192. if ok {
  193. // Wake up sendRoutine if necessary
  194. select {
  195. case c.send <- struct{}{}:
  196. default:
  197. }
  198. }
  199. return ok
  200. }
  201. func (c *MConnection) CanSend(chId byte) bool {
  202. if atomic.LoadUint32(&c.stopped) == 1 {
  203. return false
  204. }
  205. channel, ok := c.channelsIdx[chId]
  206. if !ok {
  207. log.Error(Fmt("Unknown channel %X", chId))
  208. return false
  209. }
  210. return channel.canSend()
  211. }
  212. // sendRoutine polls for packets to send from channels.
  213. func (c *MConnection) sendRoutine() {
  214. defer c._recover()
  215. FOR_LOOP:
  216. for {
  217. var n int64
  218. var err error
  219. select {
  220. case <-c.flushTimer.Ch:
  221. // NOTE: flushTimer.Set() must be called every time
  222. // something is written to .bufWriter.
  223. c.flush()
  224. case <-c.chStatsTimer.Ch:
  225. for _, channel := range c.channels {
  226. channel.updateStats()
  227. }
  228. case <-c.pingTimer.Ch:
  229. log.Debug("Send Ping")
  230. binary.WriteByte(packetTypePing, c.bufWriter, &n, &err)
  231. c.sendMonitor.Update(int(n))
  232. c.flush()
  233. case <-c.pong:
  234. log.Debug("Send Pong")
  235. binary.WriteByte(packetTypePong, c.bufWriter, &n, &err)
  236. c.sendMonitor.Update(int(n))
  237. c.flush()
  238. case <-c.quit:
  239. break FOR_LOOP
  240. case <-c.send:
  241. // Send some msgPackets
  242. eof := c.sendSomeMsgPackets()
  243. if !eof {
  244. // Keep sendRoutine awake.
  245. select {
  246. case c.send <- struct{}{}:
  247. default:
  248. }
  249. }
  250. }
  251. if atomic.LoadUint32(&c.stopped) == 1 {
  252. break FOR_LOOP
  253. }
  254. if err != nil {
  255. log.Warn("Connection failed @ sendRoutine", "connection", c, "error", err)
  256. c.stopForError(err)
  257. break FOR_LOOP
  258. }
  259. }
  260. // Cleanup
  261. }
  262. // Returns true if messages from channels were exhausted.
  263. // Blocks in accordance to .sendMonitor throttling.
  264. func (c *MConnection) sendSomeMsgPackets() bool {
  265. // Block until .sendMonitor says we can write.
  266. // Once we're ready we send more than we asked for,
  267. // but amortized it should even out.
  268. c.sendMonitor.Limit(maxMsgPacketSize, atomic.LoadInt64(&c.sendRate), true)
  269. // Now send some msgPackets.
  270. for i := 0; i < numBatchMsgPackets; i++ {
  271. if c.sendMsgPacket() {
  272. return true
  273. }
  274. }
  275. return false
  276. }
  277. // Returns true if messages from channels were exhausted.
  278. func (c *MConnection) sendMsgPacket() bool {
  279. // Choose a channel to create a msgPacket from.
  280. // The chosen channel will be the one whose recentlySent/priority is the least.
  281. var leastRatio float32 = math.MaxFloat32
  282. var leastChannel *Channel
  283. for _, channel := range c.channels {
  284. // If nothing to send, skip this channel
  285. if !channel.isSendPending() {
  286. continue
  287. }
  288. // Get ratio, and keep track of lowest ratio.
  289. ratio := float32(channel.recentlySent) / float32(channel.priority)
  290. if ratio < leastRatio {
  291. leastRatio = ratio
  292. leastChannel = channel
  293. }
  294. }
  295. // Nothing to send?
  296. if leastChannel == nil {
  297. return true
  298. } else {
  299. // log.Debug("Found a msgPacket to send")
  300. }
  301. // Make & send a msgPacket from this channel
  302. n, err := leastChannel.writeMsgPacketTo(c.bufWriter)
  303. if err != nil {
  304. log.Warn("Failed to write msgPacket", "error", err)
  305. c.stopForError(err)
  306. return true
  307. }
  308. c.sendMonitor.Update(int(n))
  309. c.flushTimer.Set()
  310. return false
  311. }
  312. // recvRoutine reads msgPackets and reconstructs the message using the channels' "recving" buffer.
  313. // After a whole message has been assembled, it's pushed to onReceive().
  314. // Blocks depending on how the connection is throttled.
  315. func (c *MConnection) recvRoutine() {
  316. defer c._recover()
  317. FOR_LOOP:
  318. for {
  319. // Block until .recvMonitor says we can read.
  320. c.recvMonitor.Limit(maxMsgPacketSize, atomic.LoadInt64(&c.recvRate), true)
  321. /*
  322. // Peek into bufReader for debugging
  323. if numBytes := c.bufReader.Buffered(); numBytes > 0 {
  324. log.Debug("Peek connection buffer", "numBytes", numBytes, "bytes", log15.Lazy{func() []byte {
  325. bytes, err := c.bufReader.Peek(MinInt(numBytes, 100))
  326. if err == nil {
  327. return bytes
  328. } else {
  329. log.Warn("Error peeking connection buffer", "error", err)
  330. return nil
  331. }
  332. }})
  333. }
  334. */
  335. // Read packet type
  336. var n int64
  337. var err error
  338. pktType := binary.ReadByte(c.bufReader, &n, &err)
  339. c.recvMonitor.Update(int(n))
  340. if err != nil {
  341. if atomic.LoadUint32(&c.stopped) != 1 {
  342. log.Warn("Connection failed @ recvRoutine (reading byte)", "connection", c, "error", err)
  343. c.stopForError(err)
  344. }
  345. break FOR_LOOP
  346. }
  347. // Read more depending on packet type.
  348. switch pktType {
  349. case packetTypePing:
  350. // TODO: prevent abuse, as they cause flush()'s.
  351. log.Debug("Receive Ping")
  352. c.pong <- struct{}{}
  353. case packetTypePong:
  354. // do nothing
  355. log.Debug("Receive Pong")
  356. case packetTypeMsg:
  357. pkt, n, err := msgPacket{}, new(int64), new(error)
  358. binary.ReadBinary(&pkt, c.bufReader, n, err)
  359. c.recvMonitor.Update(int(*n))
  360. if *err != nil {
  361. if atomic.LoadUint32(&c.stopped) != 1 {
  362. log.Warn("Connection failed @ recvRoutine", "connection", c, "error", *err)
  363. c.stopForError(*err)
  364. }
  365. break FOR_LOOP
  366. }
  367. channel, ok := c.channelsIdx[pkt.ChannelId]
  368. if !ok || channel == nil {
  369. panic(Fmt("Unknown channel %X", pkt.ChannelId))
  370. }
  371. msgBytes := channel.recvMsgPacket(pkt)
  372. if msgBytes != nil {
  373. //log.Debug("Received bytes", "chId", pkt.ChannelId, "msgBytes", msgBytes)
  374. c.onReceive(pkt.ChannelId, msgBytes)
  375. }
  376. default:
  377. panic(Fmt("Unknown message type %X", pktType))
  378. }
  379. // TODO: shouldn't this go in the sendRoutine?
  380. // Better to send a ping packet when *we* haven't sent anything for a while.
  381. c.pingTimer.Reset()
  382. }
  383. // Cleanup
  384. close(c.pong)
  385. for _ = range c.pong {
  386. // Drain
  387. }
  388. }
  389. //-----------------------------------------------------------------------------
  390. type ChannelDescriptor struct {
  391. Id byte
  392. Priority uint
  393. SendQueueCapacity uint
  394. RecvBufferCapacity uint
  395. }
  396. func (chDesc *ChannelDescriptor) FillDefaults() {
  397. if chDesc.SendQueueCapacity == 0 {
  398. chDesc.SendQueueCapacity = defaultSendQueueCapacity
  399. }
  400. if chDesc.RecvBufferCapacity == 0 {
  401. chDesc.RecvBufferCapacity = defaultRecvBufferCapacity
  402. }
  403. }
  404. // TODO: lowercase.
  405. // NOTE: not goroutine-safe.
  406. type Channel struct {
  407. conn *MConnection
  408. desc *ChannelDescriptor
  409. id byte
  410. sendQueue chan []byte
  411. sendQueueSize uint32 // atomic.
  412. recving []byte
  413. sending []byte
  414. priority uint
  415. recentlySent int64 // exponential moving average
  416. }
  417. func newChannel(conn *MConnection, desc *ChannelDescriptor) *Channel {
  418. desc.FillDefaults()
  419. if desc.Priority <= 0 {
  420. panic("Channel default priority must be a postive integer")
  421. }
  422. return &Channel{
  423. conn: conn,
  424. desc: desc,
  425. id: desc.Id,
  426. sendQueue: make(chan []byte, desc.SendQueueCapacity),
  427. recving: make([]byte, 0, desc.RecvBufferCapacity),
  428. priority: desc.Priority,
  429. }
  430. }
  431. // Queues message to send to this channel.
  432. // Goroutine-safe
  433. // Times out (and returns false) after defaultSendTimeoutSeconds
  434. func (ch *Channel) sendBytes(bytes []byte) bool {
  435. timeout := time.NewTimer(defaultSendTimeoutSeconds * time.Second)
  436. select {
  437. case <-timeout.C:
  438. // timeout
  439. return false
  440. case ch.sendQueue <- bytes:
  441. atomic.AddUint32(&ch.sendQueueSize, 1)
  442. return true
  443. }
  444. }
  445. // Queues message to send to this channel.
  446. // Nonblocking, returns true if successful.
  447. // Goroutine-safe
  448. func (ch *Channel) trySendBytes(bytes []byte) bool {
  449. select {
  450. case ch.sendQueue <- bytes:
  451. atomic.AddUint32(&ch.sendQueueSize, 1)
  452. return true
  453. default:
  454. return false
  455. }
  456. }
  457. // Goroutine-safe
  458. func (ch *Channel) loadSendQueueSize() (size int) {
  459. return int(atomic.LoadUint32(&ch.sendQueueSize))
  460. }
  461. // Goroutine-safe
  462. // Use only as a heuristic.
  463. func (ch *Channel) canSend() bool {
  464. return ch.loadSendQueueSize() < defaultSendQueueCapacity
  465. }
  466. // Returns true if any msgPackets are pending to be sent.
  467. // Call before calling nextMsgPacket()
  468. // Goroutine-safe
  469. func (ch *Channel) isSendPending() bool {
  470. if len(ch.sending) == 0 {
  471. if len(ch.sendQueue) == 0 {
  472. return false
  473. }
  474. ch.sending = <-ch.sendQueue
  475. }
  476. return true
  477. }
  478. // Creates a new msgPacket to send.
  479. // Not goroutine-safe
  480. func (ch *Channel) nextMsgPacket() msgPacket {
  481. packet := msgPacket{}
  482. packet.ChannelId = byte(ch.id)
  483. packet.Bytes = ch.sending[:MinInt(maxMsgPacketSize, len(ch.sending))]
  484. if len(ch.sending) <= maxMsgPacketSize {
  485. packet.EOF = byte(0x01)
  486. ch.sending = nil
  487. atomic.AddUint32(&ch.sendQueueSize, ^uint32(0)) // decrement sendQueueSize
  488. } else {
  489. packet.EOF = byte(0x00)
  490. ch.sending = ch.sending[MinInt(maxMsgPacketSize, len(ch.sending)):]
  491. }
  492. return packet
  493. }
  494. // Writes next msgPacket to w.
  495. // Not goroutine-safe
  496. func (ch *Channel) writeMsgPacketTo(w io.Writer) (n int64, err error) {
  497. packet := ch.nextMsgPacket()
  498. binary.WriteByte(packetTypeMsg, w, &n, &err)
  499. binary.WriteBinary(packet, w, &n, &err)
  500. if err != nil {
  501. ch.recentlySent += n
  502. }
  503. return
  504. }
  505. // Handles incoming msgPackets. Returns a msg bytes if msg is complete.
  506. // Not goroutine-safe
  507. func (ch *Channel) recvMsgPacket(pkt msgPacket) []byte {
  508. ch.recving = append(ch.recving, pkt.Bytes...)
  509. if pkt.EOF == byte(0x01) {
  510. msgBytes := ch.recving
  511. ch.recving = make([]byte, 0, defaultRecvBufferCapacity)
  512. return msgBytes
  513. }
  514. return nil
  515. }
  516. // Call this periodically to update stats for throttling purposes.
  517. // Not goroutine-safe
  518. func (ch *Channel) updateStats() {
  519. // Exponential decay of stats.
  520. // TODO: optimize.
  521. ch.recentlySent = int64(float64(ch.recentlySent) * 0.5)
  522. }
  523. //-----------------------------------------------------------------------------
  524. const (
  525. maxMsgPacketSize = 1024
  526. packetTypePing = byte(0x01)
  527. packetTypePong = byte(0x02)
  528. packetTypeMsg = byte(0x03)
  529. )
  530. // Messages in channels are chopped into smaller msgPackets for multiplexing.
  531. type msgPacket struct {
  532. ChannelId byte
  533. EOF byte // 1 means message ends here.
  534. Bytes []byte
  535. }
  536. func (p msgPacket) String() string {
  537. return fmt.Sprintf("MsgPacket{%X:%X}", p.ChannelId, p.Bytes)
  538. }
  539. //-----------------------------------------------------------------------------
  540. // Convenience struct for writing typed messages.
  541. // Reading requires a custom decoder that switches on the first type byte of a byteslice.
  542. type TypedMessage struct {
  543. Type byte
  544. Msg interface{}
  545. }
  546. func (tm TypedMessage) String() string {
  547. return fmt.Sprintf("TMsg{%X:%v}", tm.Type, tm.Msg)
  548. }