You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

640 lines
16 KiB

11 years ago
11 years ago
11 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
11 years ago
11 years ago
11 years ago
10 years ago
11 years ago
10 years ago
10 years ago
11 years ago
11 years ago
11 years ago
11 years ago
11 years ago
11 years ago
11 years ago
10 years ago
10 years ago
10 years ago
11 years ago
10 years ago
11 years ago
11 years ago
11 years ago
11 years ago
11 years ago
11 years ago
11 years ago
11 years ago
11 years ago
10 years ago
10 years ago
11 years ago
11 years ago
11 years ago
11 years ago
11 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
  1. package p2p
  2. import (
  3. "bufio"
  4. "fmt"
  5. "io"
  6. "math"
  7. "net"
  8. "runtime/debug"
  9. "sync/atomic"
  10. "time"
  11. flow "github.com/tendermint/tendermint/Godeps/_workspace/src/code.google.com/p/mxk/go1/flowcontrol"
  12. . "github.com/tendermint/tendermint/common"
  13. "github.com/tendermint/tendermint/wire" //"github.com/tendermint/log15"
  14. )
  15. const (
  16. numBatchMsgPackets = 10
  17. minReadBufferSize = 1024
  18. minWriteBufferSize = 1024
  19. idleTimeoutMinutes = 5
  20. updateStatsSeconds = 2
  21. pingTimeoutSeconds = 40
  22. defaultSendRate = 10240 // 10Kb/s
  23. defaultRecvRate = 10240 // 10Kb/s
  24. flushThrottleMS = 100
  25. defaultSendQueueCapacity = 1
  26. defaultRecvBufferCapacity = 4096
  27. defaultSendTimeoutSeconds = 10
  28. )
  29. type receiveCbFunc func(chId byte, msgBytes []byte)
  30. type errorCbFunc func(interface{})
  31. /*
  32. Each peer has one `MConnection` (multiplex connection) instance.
  33. __multiplex__ *noun* a system or signal involving simultaneous transmission of
  34. several messages along a single channel of communication.
  35. Each `MConnection` handles message transmission on multiple abstract communication
  36. `Channel`s. Each channel has a globally unique byte id.
  37. The byte id and the relative priorities of each `Channel` are configured upon
  38. initialization of the connection.
  39. There are two methods for sending messages:
  40. func (m MConnection) Send(chId byte, msg interface{}) bool {}
  41. func (m MConnection) TrySend(chId byte, msg interface{}) bool {}
  42. `Send(chId, msg)` is a blocking call that waits until `msg` is successfully queued
  43. for the channel with the given id byte `chId`, or until the request times out.
  44. The message `msg` is serialized using the `tendermint/wire` submodule's
  45. `WriteBinary()` reflection routine.
  46. `TrySend(chId, msg)` is a nonblocking call that returns false if the channel's
  47. queue is full.
  48. Inbound message bytes are handled with an onReceive callback function.
  49. */
  50. type MConnection struct {
  51. BaseService
  52. conn net.Conn
  53. bufReader *bufio.Reader
  54. bufWriter *bufio.Writer
  55. sendMonitor *flow.Monitor
  56. recvMonitor *flow.Monitor
  57. sendRate int64
  58. recvRate int64
  59. send chan struct{}
  60. pong chan struct{}
  61. channels []*Channel
  62. channelsIdx map[byte]*Channel
  63. onReceive receiveCbFunc
  64. onError errorCbFunc
  65. errored uint32
  66. quit chan struct{}
  67. flushTimer *ThrottleTimer // flush writes as necessary but throttled.
  68. pingTimer *RepeatTimer // send pings periodically
  69. chStatsTimer *RepeatTimer // update channel stats periodically
  70. LocalAddress *NetAddress
  71. RemoteAddress *NetAddress
  72. }
  73. func NewMConnection(conn net.Conn, chDescs []*ChannelDescriptor, onReceive receiveCbFunc, onError errorCbFunc) *MConnection {
  74. mconn := &MConnection{
  75. conn: conn,
  76. bufReader: bufio.NewReaderSize(conn, minReadBufferSize),
  77. bufWriter: bufio.NewWriterSize(conn, minWriteBufferSize),
  78. sendMonitor: flow.New(0, 0),
  79. recvMonitor: flow.New(0, 0),
  80. sendRate: defaultSendRate,
  81. recvRate: defaultRecvRate,
  82. send: make(chan struct{}, 1),
  83. pong: make(chan struct{}),
  84. onReceive: onReceive,
  85. onError: onError,
  86. // Initialized in Start()
  87. quit: nil,
  88. flushTimer: nil,
  89. pingTimer: nil,
  90. chStatsTimer: nil,
  91. LocalAddress: NewNetAddress(conn.LocalAddr()),
  92. RemoteAddress: NewNetAddress(conn.RemoteAddr()),
  93. }
  94. // Create channels
  95. var channelsIdx = map[byte]*Channel{}
  96. var channels = []*Channel{}
  97. for _, desc := range chDescs {
  98. channel := newChannel(mconn, desc)
  99. channelsIdx[channel.id] = channel
  100. channels = append(channels, channel)
  101. }
  102. mconn.channels = channels
  103. mconn.channelsIdx = channelsIdx
  104. mconn.BaseService = *NewBaseService(log, "MConnection", mconn)
  105. return mconn
  106. }
  107. func (c *MConnection) OnStart() error {
  108. c.BaseService.OnStart()
  109. c.quit = make(chan struct{})
  110. c.flushTimer = NewThrottleTimer("flush", flushThrottleMS*time.Millisecond)
  111. c.pingTimer = NewRepeatTimer("ping", pingTimeoutSeconds*time.Second)
  112. c.chStatsTimer = NewRepeatTimer("chStats", updateStatsSeconds*time.Second)
  113. go c.sendRoutine()
  114. go c.recvRoutine()
  115. return nil
  116. }
  117. func (c *MConnection) OnStop() {
  118. c.BaseService.OnStop()
  119. c.flushTimer.Stop()
  120. c.pingTimer.Stop()
  121. c.chStatsTimer.Stop()
  122. if c.quit != nil {
  123. close(c.quit)
  124. }
  125. c.conn.Close()
  126. // We can't close pong safely here because
  127. // recvRoutine may write to it after we've stopped.
  128. // Though it doesn't need to get closed at all,
  129. // we close it @ recvRoutine.
  130. // close(c.pong)
  131. }
  132. func (c *MConnection) String() string {
  133. return fmt.Sprintf("MConn{%v}", c.conn.RemoteAddr())
  134. }
  135. func (c *MConnection) flush() {
  136. log.Debug("Flush", "conn", c)
  137. err := c.bufWriter.Flush()
  138. if err != nil {
  139. log.Warn("MConnection flush failed", "error", err)
  140. }
  141. }
  142. // Catch panics, usually caused by remote disconnects.
  143. func (c *MConnection) _recover() {
  144. if r := recover(); r != nil {
  145. stack := debug.Stack()
  146. err := StackError{r, stack}
  147. c.stopForError(err)
  148. }
  149. }
  150. func (c *MConnection) stopForError(r interface{}) {
  151. c.Stop()
  152. if atomic.CompareAndSwapUint32(&c.errored, 0, 1) {
  153. if c.onError != nil {
  154. c.onError(r)
  155. }
  156. }
  157. }
  158. // Queues a message to be sent to channel.
  159. func (c *MConnection) Send(chId byte, msg interface{}) bool {
  160. if !c.IsRunning() {
  161. return false
  162. }
  163. log.Info("Send", "channel", chId, "conn", c, "msg", msg) //, "bytes", wire.BinaryBytes(msg))
  164. // Send message to channel.
  165. channel, ok := c.channelsIdx[chId]
  166. if !ok {
  167. log.Error(Fmt("Cannot send bytes, unknown channel %X", chId))
  168. return false
  169. }
  170. success := channel.sendBytes(wire.BinaryBytes(msg))
  171. if success {
  172. // Wake up sendRoutine if necessary
  173. select {
  174. case c.send <- struct{}{}:
  175. default:
  176. }
  177. } else {
  178. log.Warn("Send failed", "channel", chId, "conn", c, "msg", msg)
  179. }
  180. return success
  181. }
  182. // Queues a message to be sent to channel.
  183. // Nonblocking, returns true if successful.
  184. func (c *MConnection) TrySend(chId byte, msg interface{}) bool {
  185. if !c.IsRunning() {
  186. return false
  187. }
  188. log.Info("TrySend", "channel", chId, "conn", c, "msg", msg)
  189. // Send message to channel.
  190. channel, ok := c.channelsIdx[chId]
  191. if !ok {
  192. log.Error(Fmt("Cannot send bytes, unknown channel %X", chId))
  193. return false
  194. }
  195. ok = channel.trySendBytes(wire.BinaryBytes(msg))
  196. if ok {
  197. // Wake up sendRoutine if necessary
  198. select {
  199. case c.send <- struct{}{}:
  200. default:
  201. }
  202. }
  203. return ok
  204. }
  205. func (c *MConnection) CanSend(chId byte) bool {
  206. if !c.IsRunning() {
  207. return false
  208. }
  209. channel, ok := c.channelsIdx[chId]
  210. if !ok {
  211. log.Error(Fmt("Unknown channel %X", chId))
  212. return false
  213. }
  214. return channel.canSend()
  215. }
  216. // sendRoutine polls for packets to send from channels.
  217. func (c *MConnection) sendRoutine() {
  218. defer c._recover()
  219. FOR_LOOP:
  220. for {
  221. var n int64
  222. var err error
  223. select {
  224. case <-c.flushTimer.Ch:
  225. // NOTE: flushTimer.Set() must be called every time
  226. // something is written to .bufWriter.
  227. c.flush()
  228. case <-c.chStatsTimer.Ch:
  229. for _, channel := range c.channels {
  230. channel.updateStats()
  231. }
  232. case <-c.pingTimer.Ch:
  233. log.Info("Send Ping")
  234. wire.WriteByte(packetTypePing, c.bufWriter, &n, &err)
  235. c.sendMonitor.Update(int(n))
  236. c.flush()
  237. case <-c.pong:
  238. log.Info("Send Pong")
  239. wire.WriteByte(packetTypePong, c.bufWriter, &n, &err)
  240. c.sendMonitor.Update(int(n))
  241. c.flush()
  242. case <-c.quit:
  243. break FOR_LOOP
  244. case <-c.send:
  245. // Send some msgPackets
  246. eof := c.sendSomeMsgPackets()
  247. if !eof {
  248. // Keep sendRoutine awake.
  249. select {
  250. case c.send <- struct{}{}:
  251. default:
  252. }
  253. }
  254. }
  255. if !c.IsRunning() {
  256. break FOR_LOOP
  257. }
  258. if err != nil {
  259. log.Warn("Connection failed @ sendRoutine", "conn", c, "error", err)
  260. c.stopForError(err)
  261. break FOR_LOOP
  262. }
  263. }
  264. // Cleanup
  265. }
  266. // Returns true if messages from channels were exhausted.
  267. // Blocks in accordance to .sendMonitor throttling.
  268. func (c *MConnection) sendSomeMsgPackets() bool {
  269. // Block until .sendMonitor says we can write.
  270. // Once we're ready we send more than we asked for,
  271. // but amortized it should even out.
  272. c.sendMonitor.Limit(maxMsgPacketSize, atomic.LoadInt64(&c.sendRate), true)
  273. // Now send some msgPackets.
  274. for i := 0; i < numBatchMsgPackets; i++ {
  275. if c.sendMsgPacket() {
  276. return true
  277. }
  278. }
  279. return false
  280. }
  281. // Returns true if messages from channels were exhausted.
  282. func (c *MConnection) sendMsgPacket() bool {
  283. // Choose a channel to create a msgPacket from.
  284. // The chosen channel will be the one whose recentlySent/priority is the least.
  285. var leastRatio float32 = math.MaxFloat32
  286. var leastChannel *Channel
  287. for _, channel := range c.channels {
  288. // If nothing to send, skip this channel
  289. if !channel.isSendPending() {
  290. continue
  291. }
  292. // Get ratio, and keep track of lowest ratio.
  293. ratio := float32(channel.recentlySent) / float32(channel.priority)
  294. if ratio < leastRatio {
  295. leastRatio = ratio
  296. leastChannel = channel
  297. }
  298. }
  299. // Nothing to send?
  300. if leastChannel == nil {
  301. return true
  302. } else {
  303. // log.Info("Found a msgPacket to send")
  304. }
  305. // Make & send a msgPacket from this channel
  306. n, err := leastChannel.writeMsgPacketTo(c.bufWriter)
  307. if err != nil {
  308. log.Warn("Failed to write msgPacket", "error", err)
  309. c.stopForError(err)
  310. return true
  311. }
  312. c.sendMonitor.Update(int(n))
  313. c.flushTimer.Set()
  314. return false
  315. }
  316. // recvRoutine reads msgPackets and reconstructs the message using the channels' "recving" buffer.
  317. // After a whole message has been assembled, it's pushed to onReceive().
  318. // Blocks depending on how the connection is throttled.
  319. func (c *MConnection) recvRoutine() {
  320. defer c._recover()
  321. FOR_LOOP:
  322. for {
  323. // Block until .recvMonitor says we can read.
  324. c.recvMonitor.Limit(maxMsgPacketSize, atomic.LoadInt64(&c.recvRate), true)
  325. /*
  326. // Peek into bufReader for debugging
  327. if numBytes := c.bufReader.Buffered(); numBytes > 0 {
  328. log.Info("Peek connection buffer", "numBytes", numBytes, "bytes", log15.Lazy{func() []byte {
  329. bytes, err := c.bufReader.Peek(MinInt(numBytes, 100))
  330. if err == nil {
  331. return bytes
  332. } else {
  333. log.Warn("Error peeking connection buffer", "error", err)
  334. return nil
  335. }
  336. }})
  337. }
  338. */
  339. // Read packet type
  340. var n int64
  341. var err error
  342. pktType := wire.ReadByte(c.bufReader, &n, &err)
  343. c.recvMonitor.Update(int(n))
  344. if err != nil {
  345. if c.IsRunning() {
  346. log.Warn("Connection failed @ recvRoutine (reading byte)", "conn", c, "error", err)
  347. c.stopForError(err)
  348. }
  349. break FOR_LOOP
  350. }
  351. // Read more depending on packet type.
  352. switch pktType {
  353. case packetTypePing:
  354. // TODO: prevent abuse, as they cause flush()'s.
  355. log.Info("Receive Ping")
  356. c.pong <- struct{}{}
  357. case packetTypePong:
  358. // do nothing
  359. log.Info("Receive Pong")
  360. case packetTypeMsg:
  361. pkt, n, err := msgPacket{}, int64(0), error(nil)
  362. wire.ReadBinaryPtr(&pkt, c.bufReader, &n, &err)
  363. c.recvMonitor.Update(int(n))
  364. if err != nil {
  365. if c.IsRunning() {
  366. log.Warn("Connection failed @ recvRoutine", "conn", c, "error", err)
  367. c.stopForError(err)
  368. }
  369. break FOR_LOOP
  370. }
  371. channel, ok := c.channelsIdx[pkt.ChannelId]
  372. if !ok || channel == nil {
  373. PanicQ(Fmt("Unknown channel %X", pkt.ChannelId))
  374. }
  375. msgBytes, err := channel.recvMsgPacket(pkt)
  376. if err != nil {
  377. if c.IsRunning() {
  378. log.Warn("Connection failed @ recvRoutine", "conn", c, "error", err)
  379. c.stopForError(err)
  380. }
  381. break FOR_LOOP
  382. }
  383. if msgBytes != nil {
  384. log.Debug("Received bytes", "chId", pkt.ChannelId, "msgBytes", msgBytes)
  385. c.onReceive(pkt.ChannelId, msgBytes)
  386. }
  387. default:
  388. PanicSanity(Fmt("Unknown message type %X", pktType))
  389. }
  390. // TODO: shouldn't this go in the sendRoutine?
  391. // Better to send a ping packet when *we* haven't sent anything for a while.
  392. c.pingTimer.Reset()
  393. }
  394. // Cleanup
  395. close(c.pong)
  396. for _ = range c.pong {
  397. // Drain
  398. }
  399. }
  400. //-----------------------------------------------------------------------------
  401. type ChannelDescriptor struct {
  402. Id byte
  403. Priority int
  404. SendQueueCapacity int
  405. RecvBufferCapacity int
  406. }
  407. func (chDesc *ChannelDescriptor) FillDefaults() {
  408. if chDesc.SendQueueCapacity == 0 {
  409. chDesc.SendQueueCapacity = defaultSendQueueCapacity
  410. }
  411. if chDesc.RecvBufferCapacity == 0 {
  412. chDesc.RecvBufferCapacity = defaultRecvBufferCapacity
  413. }
  414. }
  415. // TODO: lowercase.
  416. // NOTE: not goroutine-safe.
  417. type Channel struct {
  418. conn *MConnection
  419. desc *ChannelDescriptor
  420. id byte
  421. sendQueue chan []byte
  422. sendQueueSize int32 // atomic.
  423. recving []byte
  424. sending []byte
  425. priority int
  426. recentlySent int64 // exponential moving average
  427. }
  428. func newChannel(conn *MConnection, desc *ChannelDescriptor) *Channel {
  429. desc.FillDefaults()
  430. if desc.Priority <= 0 {
  431. PanicSanity("Channel default priority must be a postive integer")
  432. }
  433. return &Channel{
  434. conn: conn,
  435. desc: desc,
  436. id: desc.Id,
  437. sendQueue: make(chan []byte, desc.SendQueueCapacity),
  438. recving: make([]byte, 0, desc.RecvBufferCapacity),
  439. priority: desc.Priority,
  440. }
  441. }
  442. // Queues message to send to this channel.
  443. // Goroutine-safe
  444. // Times out (and returns false) after defaultSendTimeoutSeconds
  445. func (ch *Channel) sendBytes(bytes []byte) bool {
  446. timeout := time.NewTimer(defaultSendTimeoutSeconds * time.Second)
  447. select {
  448. case <-timeout.C:
  449. // timeout
  450. return false
  451. case ch.sendQueue <- bytes:
  452. atomic.AddInt32(&ch.sendQueueSize, 1)
  453. return true
  454. }
  455. }
  456. // Queues message to send to this channel.
  457. // Nonblocking, returns true if successful.
  458. // Goroutine-safe
  459. func (ch *Channel) trySendBytes(bytes []byte) bool {
  460. select {
  461. case ch.sendQueue <- bytes:
  462. atomic.AddInt32(&ch.sendQueueSize, 1)
  463. return true
  464. default:
  465. return false
  466. }
  467. }
  468. // Goroutine-safe
  469. func (ch *Channel) loadSendQueueSize() (size int) {
  470. return int(atomic.LoadInt32(&ch.sendQueueSize))
  471. }
  472. // Goroutine-safe
  473. // Use only as a heuristic.
  474. func (ch *Channel) canSend() bool {
  475. return ch.loadSendQueueSize() < defaultSendQueueCapacity
  476. }
  477. // Returns true if any msgPackets are pending to be sent.
  478. // Call before calling nextMsgPacket()
  479. // Goroutine-safe
  480. func (ch *Channel) isSendPending() bool {
  481. if len(ch.sending) == 0 {
  482. if len(ch.sendQueue) == 0 {
  483. return false
  484. }
  485. ch.sending = <-ch.sendQueue
  486. }
  487. return true
  488. }
  489. // Creates a new msgPacket to send.
  490. // Not goroutine-safe
  491. func (ch *Channel) nextMsgPacket() msgPacket {
  492. packet := msgPacket{}
  493. packet.ChannelId = byte(ch.id)
  494. packet.Bytes = ch.sending[:MinInt(maxMsgPacketSize, len(ch.sending))]
  495. if len(ch.sending) <= maxMsgPacketSize {
  496. packet.EOF = byte(0x01)
  497. ch.sending = nil
  498. atomic.AddInt32(&ch.sendQueueSize, -1) // decrement sendQueueSize
  499. } else {
  500. packet.EOF = byte(0x00)
  501. ch.sending = ch.sending[MinInt(maxMsgPacketSize, len(ch.sending)):]
  502. }
  503. return packet
  504. }
  505. // Writes next msgPacket to w.
  506. // Not goroutine-safe
  507. func (ch *Channel) writeMsgPacketTo(w io.Writer) (n int64, err error) {
  508. packet := ch.nextMsgPacket()
  509. log.Debug("Write Msg Packet", "conn", ch.conn, "packet", packet)
  510. wire.WriteByte(packetTypeMsg, w, &n, &err)
  511. wire.WriteBinary(packet, w, &n, &err)
  512. if err != nil {
  513. ch.recentlySent += n
  514. }
  515. return
  516. }
  517. // Handles incoming msgPackets. Returns a msg bytes if msg is complete.
  518. // Not goroutine-safe
  519. func (ch *Channel) recvMsgPacket(packet msgPacket) ([]byte, error) {
  520. log.Debug("Read Msg Packet", "conn", ch.conn, "packet", packet)
  521. if wire.MaxBinaryReadSize < len(ch.recving)+len(packet.Bytes) {
  522. return nil, wire.ErrBinaryReadSizeOverflow
  523. }
  524. ch.recving = append(ch.recving, packet.Bytes...)
  525. if packet.EOF == byte(0x01) {
  526. msgBytes := ch.recving
  527. ch.recving = make([]byte, 0, defaultRecvBufferCapacity)
  528. return msgBytes, nil
  529. }
  530. return nil, nil
  531. }
  532. // Call this periodically to update stats for throttling purposes.
  533. // Not goroutine-safe
  534. func (ch *Channel) updateStats() {
  535. // Exponential decay of stats.
  536. // TODO: optimize.
  537. ch.recentlySent = int64(float64(ch.recentlySent) * 0.5)
  538. }
  539. //-----------------------------------------------------------------------------
  540. const (
  541. maxMsgPacketSize = 1024
  542. packetTypePing = byte(0x01)
  543. packetTypePong = byte(0x02)
  544. packetTypeMsg = byte(0x03)
  545. )
  546. // Messages in channels are chopped into smaller msgPackets for multiplexing.
  547. type msgPacket struct {
  548. ChannelId byte
  549. EOF byte // 1 means message ends here.
  550. Bytes []byte
  551. }
  552. func (p msgPacket) String() string {
  553. return fmt.Sprintf("MsgPacket{%X:%X T:%X}", p.ChannelId, p.Bytes, p.EOF)
  554. }
  555. //-----------------------------------------------------------------------------
  556. // Convenience struct for writing typed messages.
  557. // Reading requires a custom decoder that switches on the first type byte of a byteslice.
  558. type TypedMessage struct {
  559. Type byte
  560. Msg interface{}
  561. }
  562. func (tm TypedMessage) String() string {
  563. return fmt.Sprintf("TMsg{%X:%v}", tm.Type, tm.Msg)
  564. }