You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

587 lines
15 KiB

11 years ago
11 years ago
11 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
11 years ago
11 years ago
11 years ago
11 years ago
11 years ago
11 years ago
11 years ago
10 years ago
11 years ago
10 years ago
10 years ago
11 years ago
11 years ago
11 years ago
11 years ago
11 years ago
11 years ago
10 years ago
11 years ago
10 years ago
10 years ago
11 years ago
11 years ago
10 years ago
11 years ago
11 years ago
11 years ago
11 years ago
11 years ago
11 years ago
11 years ago
10 years ago
11 years ago
11 years ago
10 years ago
11 years ago
11 years ago
11 years ago
11 years ago
11 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
  1. package p2p
  2. import (
  3. "bufio"
  4. "fmt"
  5. "io"
  6. "math"
  7. "net"
  8. "runtime/debug"
  9. "sync/atomic"
  10. "time"
  11. flow "code.google.com/p/mxk/go1/flowcontrol"
  12. "github.com/op/go-logging"
  13. . "github.com/tendermint/tendermint/binary"
  14. . "github.com/tendermint/tendermint/common"
  15. )
  16. const (
  17. numBatchMsgPackets = 10
  18. minReadBufferSize = 1024
  19. minWriteBufferSize = 1024
  20. flushThrottleMS = 50
  21. idleTimeoutMinutes = 5
  22. updateStatsSeconds = 2
  23. pingTimeoutMinutes = 2
  24. defaultSendRate = 51200 // 5Kb/s
  25. defaultRecvRate = 51200 // 5Kb/s
  26. defaultSendQueueCapacity = 1
  27. defaultRecvBufferCapacity = 4096
  28. )
  29. type receiveCbFunc func(chId byte, msgBytes []byte)
  30. type errorCbFunc func(interface{})
  31. /*
  32. Each peer has one `MConnection` (multiplex connection) instance.
  33. __multiplex__ *noun* a system or signal involving simultaneous transmission of
  34. several messages along a single channel of communication.
  35. Each `MConnection` handles message transmission on multiple abstract communication
  36. `Channel`s. Each channel has a globally unique byte id.
  37. The byte id and the relative priorities of each `Channel` are configured upon
  38. initialization of the connection.
  39. There are two methods for sending messages:
  40. func (m MConnection) Send(chId byte, msg interface{}) bool {}
  41. func (m MConnection) TrySend(chId byte, msg interface{}) bool {}
  42. `Send(chId, msg)` is a blocking call that waits until `msg` is successfully queued
  43. for the channel with the given id byte `chId`. The message `msg` is serialized
  44. using the `tendermint/binary` submodule's `WriteBinary()` reflection routine.
  45. `TrySend(chId, msg)` is a nonblocking call that returns false if the channel's
  46. queue is full.
  47. Inbound message bytes are handled with an onReceive callback function.
  48. */
  49. type MConnection struct {
  50. conn net.Conn
  51. bufReader *bufio.Reader
  52. bufWriter *bufio.Writer
  53. sendMonitor *flow.Monitor
  54. recvMonitor *flow.Monitor
  55. sendRate int64
  56. recvRate int64
  57. flushTimer *ThrottleTimer // flush writes as necessary but throttled.
  58. send chan struct{}
  59. quit chan struct{}
  60. pingTimer *RepeatTimer // send pings periodically
  61. pong chan struct{}
  62. chStatsTimer *RepeatTimer // update channel stats periodically
  63. channels []*Channel
  64. channelsIdx map[byte]*Channel
  65. onReceive receiveCbFunc
  66. onError errorCbFunc
  67. started uint32
  68. stopped uint32
  69. errored uint32
  70. LocalAddress *NetAddress
  71. RemoteAddress *NetAddress
  72. }
  73. func NewMConnection(conn net.Conn, chDescs []*ChannelDescriptor, onReceive receiveCbFunc, onError errorCbFunc) *MConnection {
  74. mconn := &MConnection{
  75. conn: conn,
  76. bufReader: bufio.NewReaderSize(conn, minReadBufferSize),
  77. bufWriter: bufio.NewWriterSize(conn, minWriteBufferSize),
  78. sendMonitor: flow.New(0, 0),
  79. recvMonitor: flow.New(0, 0),
  80. sendRate: defaultSendRate,
  81. recvRate: defaultRecvRate,
  82. flushTimer: NewThrottleTimer(flushThrottleMS * time.Millisecond),
  83. send: make(chan struct{}, 1),
  84. quit: make(chan struct{}),
  85. pingTimer: NewRepeatTimer(pingTimeoutMinutes * time.Minute),
  86. pong: make(chan struct{}),
  87. chStatsTimer: NewRepeatTimer(updateStatsSeconds * time.Second),
  88. onReceive: onReceive,
  89. onError: onError,
  90. LocalAddress: NewNetAddress(conn.LocalAddr()),
  91. RemoteAddress: NewNetAddress(conn.RemoteAddr()),
  92. }
  93. // Create channels
  94. var channelsIdx = map[byte]*Channel{}
  95. var channels = []*Channel{}
  96. for _, desc := range chDescs {
  97. channel := newChannel(mconn, desc)
  98. channelsIdx[channel.id] = channel
  99. channels = append(channels, channel)
  100. }
  101. mconn.channels = channels
  102. mconn.channelsIdx = channelsIdx
  103. return mconn
  104. }
  105. // .Start() begins multiplexing packets to and from "channels".
  106. func (c *MConnection) Start() {
  107. if atomic.CompareAndSwapUint32(&c.started, 0, 1) {
  108. log.Debug("Starting %v", c)
  109. go c.sendRoutine()
  110. go c.recvRoutine()
  111. }
  112. }
  113. func (c *MConnection) Stop() {
  114. if atomic.CompareAndSwapUint32(&c.stopped, 0, 1) {
  115. log.Debug("Stopping %v", c)
  116. close(c.quit)
  117. c.conn.Close()
  118. c.flushTimer.Stop()
  119. c.chStatsTimer.Stop()
  120. c.pingTimer.Stop()
  121. // We can't close pong safely here because
  122. // recvRoutine may write to it after we've stopped.
  123. // Though it doesn't need to get closed at all,
  124. // we close it @ recvRoutine.
  125. // close(c.pong)
  126. }
  127. }
  128. func (c *MConnection) String() string {
  129. return fmt.Sprintf("MConn{%v}", c.conn.RemoteAddr())
  130. }
  131. func (c *MConnection) flush() {
  132. err := c.bufWriter.Flush()
  133. if err != nil {
  134. if atomic.LoadUint32(&c.stopped) != 1 {
  135. log.Warning("MConnection flush failed: %v", err)
  136. }
  137. }
  138. }
  139. // Catch panics, usually caused by remote disconnects.
  140. func (c *MConnection) _recover() {
  141. if r := recover(); r != nil {
  142. stack := debug.Stack()
  143. err := StackError{r, stack}
  144. c.stopForError(err)
  145. }
  146. }
  147. func (c *MConnection) stopForError(r interface{}) {
  148. c.Stop()
  149. if atomic.CompareAndSwapUint32(&c.errored, 0, 1) {
  150. if c.onError != nil {
  151. c.onError(r)
  152. }
  153. }
  154. }
  155. // Queues a message to be sent to channel.
  156. func (c *MConnection) Send(chId byte, msg interface{}) bool {
  157. if atomic.LoadUint32(&c.stopped) == 1 {
  158. return false
  159. }
  160. log.Debug("[%X] Send to %v: %v", chId, c, msg)
  161. // Send message to channel.
  162. channel, ok := c.channelsIdx[chId]
  163. if !ok {
  164. log.Error("Cannot send bytes, unknown channel %X", chId)
  165. return false
  166. }
  167. channel.sendBytes(BinaryBytes(msg))
  168. // Wake up sendRoutine if necessary
  169. select {
  170. case c.send <- struct{}{}:
  171. default:
  172. }
  173. return true
  174. }
  175. // Queues a message to be sent to channel.
  176. // Nonblocking, returns true if successful.
  177. func (c *MConnection) TrySend(chId byte, msg interface{}) bool {
  178. if atomic.LoadUint32(&c.stopped) == 1 {
  179. return false
  180. }
  181. log.Debug("[%X] TrySend to %v: %v", chId, c, msg)
  182. // Send message to channel.
  183. channel, ok := c.channelsIdx[chId]
  184. if !ok {
  185. log.Error("Cannot send bytes, unknown channel %X", chId)
  186. return false
  187. }
  188. ok = channel.trySendBytes(BinaryBytes(msg))
  189. if ok {
  190. // Wake up sendRoutine if necessary
  191. select {
  192. case c.send <- struct{}{}:
  193. default:
  194. }
  195. }
  196. return ok
  197. }
  198. func (c *MConnection) CanSend(chId byte) bool {
  199. if atomic.LoadUint32(&c.stopped) == 1 {
  200. return false
  201. }
  202. channel, ok := c.channelsIdx[chId]
  203. if !ok {
  204. log.Error("Unknown channel %X", chId)
  205. return false
  206. }
  207. return channel.canSend()
  208. }
  209. // sendRoutine polls for packets to send from channels.
  210. func (c *MConnection) sendRoutine() {
  211. defer c._recover()
  212. FOR_LOOP:
  213. for {
  214. var n int64
  215. var err error
  216. select {
  217. case <-c.flushTimer.Ch:
  218. // NOTE: flushTimer.Set() must be called every time
  219. // something is written to .bufWriter.
  220. c.flush()
  221. case <-c.chStatsTimer.Ch:
  222. for _, channel := range c.channels {
  223. channel.updateStats()
  224. }
  225. case <-c.pingTimer.Ch:
  226. WriteByte(packetTypePing, c.bufWriter, &n, &err)
  227. c.sendMonitor.Update(int(n))
  228. c.flush()
  229. case <-c.pong:
  230. WriteByte(packetTypePing, c.bufWriter, &n, &err)
  231. c.sendMonitor.Update(int(n))
  232. c.flush()
  233. case <-c.quit:
  234. break FOR_LOOP
  235. case <-c.send:
  236. // Send some msgPackets
  237. eof := c.sendSomeMsgPackets()
  238. if !eof {
  239. // Keep sendRoutine awake.
  240. select {
  241. case c.send <- struct{}{}:
  242. default:
  243. }
  244. }
  245. }
  246. if atomic.LoadUint32(&c.stopped) == 1 {
  247. break FOR_LOOP
  248. }
  249. if err != nil {
  250. log.Warning("%v failed @ sendRoutine:\n%v", c, err)
  251. c.Stop()
  252. break FOR_LOOP
  253. }
  254. }
  255. // Cleanup
  256. }
  257. // Returns true if messages from channels were exhausted.
  258. // Blocks in accordance to .sendMonitor throttling.
  259. func (c *MConnection) sendSomeMsgPackets() bool {
  260. // Block until .sendMonitor says we can write.
  261. // Once we're ready we send more than we asked for,
  262. // but amortized it should even out.
  263. c.sendMonitor.Limit(maxMsgPacketSize, atomic.LoadInt64(&c.sendRate), true)
  264. // Now send some msgPackets.
  265. for i := 0; i < numBatchMsgPackets; i++ {
  266. if c.sendMsgPacket() {
  267. return true
  268. }
  269. }
  270. return false
  271. }
  272. // Returns true if messages from channels were exhausted.
  273. func (c *MConnection) sendMsgPacket() bool {
  274. // Choose a channel to create a msgPacket from.
  275. // The chosen channel will be the one whose recentlySent/priority is the least.
  276. var leastRatio float32 = math.MaxFloat32
  277. var leastChannel *Channel
  278. for _, channel := range c.channels {
  279. // If nothing to send, skip this channel
  280. if !channel.isSendPending() {
  281. continue
  282. }
  283. // Get ratio, and keep track of lowest ratio.
  284. ratio := float32(channel.recentlySent) / float32(channel.priority)
  285. if ratio < leastRatio {
  286. leastRatio = ratio
  287. leastChannel = channel
  288. }
  289. }
  290. // Nothing to send?
  291. if leastChannel == nil {
  292. return true
  293. } else {
  294. // log.Debug("Found a msgPacket to send")
  295. }
  296. // Make & send a msgPacket from this channel
  297. n, err := leastChannel.writeMsgPacketTo(c.bufWriter)
  298. if err != nil {
  299. log.Warning("Failed to write msgPacket. Error: %v", err)
  300. c.stopForError(err)
  301. return true
  302. }
  303. c.sendMonitor.Update(int(n))
  304. c.flushTimer.Set()
  305. return false
  306. }
  307. // recvRoutine reads msgPackets and reconstructs the message using the channels' "recving" buffer.
  308. // After a whole message has been assembled, it's pushed to onReceive().
  309. // Blocks depending on how the connection is throttled.
  310. func (c *MConnection) recvRoutine() {
  311. defer c._recover()
  312. FOR_LOOP:
  313. for {
  314. // Block until .recvMonitor says we can read.
  315. c.recvMonitor.Limit(maxMsgPacketSize, atomic.LoadInt64(&c.recvRate), true)
  316. // Read packet type
  317. var n int64
  318. var err error
  319. pktType := ReadByte(c.bufReader, &n, &err)
  320. c.recvMonitor.Update(int(n))
  321. if err != nil {
  322. if atomic.LoadUint32(&c.stopped) != 1 {
  323. log.Warning("%v failed @ recvRoutine with err: %v", c, err)
  324. c.Stop()
  325. }
  326. break FOR_LOOP
  327. }
  328. // Peek into bufReader for debugging
  329. if log.IsEnabledFor(logging.DEBUG) {
  330. numBytes := c.bufReader.Buffered()
  331. bytes, err := c.bufReader.Peek(MinInt(numBytes, 100))
  332. if err == nil {
  333. log.Debug("recvRoutine packet type %X, peeked: %X", pktType, bytes)
  334. }
  335. }
  336. // Read more depending on packet type.
  337. switch pktType {
  338. case packetTypePing:
  339. // TODO: prevent abuse, as they cause flush()'s.
  340. c.pong <- struct{}{}
  341. case packetTypePong:
  342. // do nothing
  343. case packetTypeMsg:
  344. pkt, n, err := msgPacket{}, new(int64), new(error)
  345. ReadBinary(&pkt, c.bufReader, n, err)
  346. c.recvMonitor.Update(int(*n))
  347. if *err != nil {
  348. if atomic.LoadUint32(&c.stopped) != 1 {
  349. log.Warning("%v failed @ recvRoutine", c)
  350. c.Stop()
  351. }
  352. break FOR_LOOP
  353. }
  354. channel, ok := c.channelsIdx[pkt.ChannelId]
  355. if !ok || channel == nil {
  356. Panicf("Unknown channel %X", pkt.ChannelId)
  357. }
  358. msgBytes := channel.recvMsgPacket(pkt)
  359. if msgBytes != nil {
  360. c.onReceive(pkt.ChannelId, msgBytes)
  361. }
  362. default:
  363. Panicf("Unknown message type %X", pktType)
  364. }
  365. // TODO: shouldn't this go in the sendRoutine?
  366. // Better to send a ping packet when *we* haven't sent anything for a while.
  367. c.pingTimer.Reset()
  368. }
  369. // Cleanup
  370. close(c.pong)
  371. for _ = range c.pong {
  372. // Drain
  373. }
  374. }
  375. //-----------------------------------------------------------------------------
  376. type ChannelDescriptor struct {
  377. Id byte
  378. Priority uint
  379. }
  380. // TODO: lowercase.
  381. // NOTE: not goroutine-safe.
  382. type Channel struct {
  383. conn *MConnection
  384. desc *ChannelDescriptor
  385. id byte
  386. sendQueue chan []byte
  387. sendQueueSize uint32
  388. recving []byte
  389. sending []byte
  390. priority uint
  391. recentlySent int64 // exponential moving average
  392. }
  393. func newChannel(conn *MConnection, desc *ChannelDescriptor) *Channel {
  394. if desc.Priority <= 0 {
  395. panic("Channel default priority must be a postive integer")
  396. }
  397. return &Channel{
  398. conn: conn,
  399. desc: desc,
  400. id: desc.Id,
  401. sendQueue: make(chan []byte, defaultSendQueueCapacity),
  402. recving: make([]byte, 0, defaultRecvBufferCapacity),
  403. priority: desc.Priority,
  404. }
  405. }
  406. // Queues message to send to this channel.
  407. // Goroutine-safe
  408. func (ch *Channel) sendBytes(bytes []byte) {
  409. ch.sendQueue <- bytes
  410. atomic.AddUint32(&ch.sendQueueSize, 1)
  411. }
  412. // Queues message to send to this channel.
  413. // Nonblocking, returns true if successful.
  414. // Goroutine-safe
  415. func (ch *Channel) trySendBytes(bytes []byte) bool {
  416. select {
  417. case ch.sendQueue <- bytes:
  418. atomic.AddUint32(&ch.sendQueueSize, 1)
  419. return true
  420. default:
  421. return false
  422. }
  423. }
  424. // Goroutine-safe
  425. func (ch *Channel) loadSendQueueSize() (size int) {
  426. return int(atomic.LoadUint32(&ch.sendQueueSize))
  427. }
  428. // Goroutine-safe
  429. // Use only as a heuristic.
  430. func (ch *Channel) canSend() bool {
  431. return ch.loadSendQueueSize() < defaultSendQueueCapacity
  432. }
  433. // Returns true if any msgPackets are pending to be sent.
  434. // Call before calling nextMsgPacket()
  435. // Goroutine-safe
  436. func (ch *Channel) isSendPending() bool {
  437. if len(ch.sending) == 0 {
  438. if len(ch.sendQueue) == 0 {
  439. return false
  440. }
  441. ch.sending = <-ch.sendQueue
  442. }
  443. return true
  444. }
  445. // Creates a new msgPacket to send.
  446. // Not goroutine-safe
  447. func (ch *Channel) nextMsgPacket() msgPacket {
  448. packet := msgPacket{}
  449. packet.ChannelId = byte(ch.id)
  450. packet.Bytes = ch.sending[:MinInt(maxMsgPacketSize, len(ch.sending))]
  451. if len(ch.sending) <= maxMsgPacketSize {
  452. packet.EOF = byte(0x01)
  453. ch.sending = nil
  454. atomic.AddUint32(&ch.sendQueueSize, ^uint32(0)) // decrement sendQueueSize
  455. } else {
  456. packet.EOF = byte(0x00)
  457. ch.sending = ch.sending[MinInt(maxMsgPacketSize, len(ch.sending)):]
  458. }
  459. return packet
  460. }
  461. // Writes next msgPacket to w.
  462. // Not goroutine-safe
  463. func (ch *Channel) writeMsgPacketTo(w io.Writer) (n int64, err error) {
  464. packet := ch.nextMsgPacket()
  465. WriteByte(packetTypeMsg, w, &n, &err)
  466. WriteBinary(packet, w, &n, &err)
  467. if err != nil {
  468. ch.recentlySent += n
  469. }
  470. return
  471. }
  472. // Handles incoming msgPackets. Returns a msg bytes if msg is complete.
  473. // Not goroutine-safe
  474. func (ch *Channel) recvMsgPacket(pkt msgPacket) []byte {
  475. ch.recving = append(ch.recving, pkt.Bytes...)
  476. if pkt.EOF == byte(0x01) {
  477. msgBytes := ch.recving
  478. ch.recving = make([]byte, 0, defaultRecvBufferCapacity)
  479. return msgBytes
  480. }
  481. return nil
  482. }
  483. // Call this periodically to update stats for throttling purposes.
  484. // Not goroutine-safe
  485. func (ch *Channel) updateStats() {
  486. // Exponential decay of stats.
  487. // TODO: optimize.
  488. ch.recentlySent = int64(float64(ch.recentlySent) * 0.5)
  489. }
  490. //-----------------------------------------------------------------------------
  491. const (
  492. maxMsgPacketSize = 1024
  493. packetTypePing = byte(0x00)
  494. packetTypePong = byte(0x01)
  495. packetTypeMsg = byte(0x10)
  496. )
  497. // Messages in channels are chopped into smaller msgPackets for multiplexing.
  498. type msgPacket struct {
  499. ChannelId byte
  500. EOF byte // 1 means message ends here.
  501. Bytes []byte
  502. }
  503. func (p msgPacket) String() string {
  504. return fmt.Sprintf("MsgPacket{%X:%X}", p.ChannelId, p.Bytes)
  505. }
  506. //-----------------------------------------------------------------------------
  507. // Convenience struct for writing typed messages.
  508. // Reading requires a custom decoder that switches on the first type byte of a byteslice.
  509. type TypedMessage struct {
  510. Type byte
  511. Msg interface{}
  512. }
  513. func (tm TypedMessage) String() string {
  514. return fmt.Sprintf("TMsg{%X:%v}", tm.Type, tm.Msg)
  515. }