You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

191 lines
4.7 KiB

11 years ago
11 years ago
11 years ago
11 years ago
11 years ago
11 years ago
11 years ago
11 years ago
11 years ago
11 years ago
11 years ago
11 years ago
11 years ago
11 years ago
11 years ago
11 years ago
11 years ago
11 years ago
11 years ago
11 years ago
11 years ago
11 years ago
11 years ago
11 years ago
11 years ago
  1. package peer
  2. import (
  3. . "github.com/tendermint/tendermint/common"
  4. . "github.com/tendermint/tendermint/binary"
  5. "sync/atomic"
  6. "net"
  7. "time"
  8. "fmt"
  9. )
  10. const (
  11. OUT_QUEUE_SIZE = 50
  12. IDLE_TIMEOUT_MINUTES = 5
  13. PING_TIMEOUT_MINUTES = 2
  14. )
  15. /* Connnection */
  16. type Connection struct {
  17. ioStats IOStats
  18. sendQueue chan Packet // never closes
  19. conn net.Conn
  20. quit chan struct{}
  21. stopped uint32
  22. pingDebouncer *Debouncer
  23. pong chan struct{}
  24. }
  25. var (
  26. PACKET_TYPE_PING = UInt8(0x00)
  27. PACKET_TYPE_PONG = UInt8(0x01)
  28. PACKET_TYPE_MSG = UInt8(0x10)
  29. )
  30. func NewConnection(conn net.Conn) *Connection {
  31. return &Connection{
  32. sendQueue: make(chan Packet, OUT_QUEUE_SIZE),
  33. conn: conn,
  34. quit: make(chan struct{}),
  35. pingDebouncer: NewDebouncer(PING_TIMEOUT_MINUTES * time.Minute),
  36. pong: make(chan struct{}),
  37. }
  38. }
  39. // returns true if successfully queued,
  40. // returns false if connection was closed.
  41. // blocks.
  42. func (c *Connection) Send(pkt Packet) bool {
  43. select {
  44. case c.sendQueue <- pkt:
  45. return true
  46. case <-c.quit:
  47. return false
  48. }
  49. }
  50. func (c *Connection) Start(channels map[String]*Channel) {
  51. log.Debugf("Starting %v", c)
  52. go c.sendHandler()
  53. go c.recvHandler(channels)
  54. }
  55. func (c *Connection) Stop() {
  56. if atomic.CompareAndSwapUint32(&c.stopped, 0, 1) {
  57. log.Debugf("Stopping %v", c)
  58. close(c.quit)
  59. c.conn.Close()
  60. c.pingDebouncer.Stop()
  61. // We can't close pong safely here because
  62. // recvHandler may write to it after we've stopped.
  63. // Though it doesn't need to get closed at all,
  64. // we close it @ recvHandler.
  65. // close(c.pong)
  66. }
  67. }
  68. func (c *Connection) LocalAddress() *NetAddress {
  69. return NewNetAddress(c.conn.LocalAddr())
  70. }
  71. func (c *Connection) RemoteAddress() *NetAddress {
  72. return NewNetAddress(c.conn.RemoteAddr())
  73. }
  74. func (c *Connection) String() string {
  75. return fmt.Sprintf("Connection{%v}", c.conn.RemoteAddr())
  76. }
  77. func (c *Connection) flush() {
  78. // TODO flush? (turn off nagel, turn back on, etc)
  79. }
  80. func (c *Connection) sendHandler() {
  81. log.Tracef("%v sendHandler", c)
  82. // TODO: catch panics & stop connection.
  83. FOR_LOOP:
  84. for {
  85. var err error
  86. select {
  87. case <-c.pingDebouncer.Ch:
  88. _, err = PACKET_TYPE_PING.WriteTo(c.conn)
  89. case sendPkt := <-c.sendQueue:
  90. log.Tracef("Found pkt from sendQueue. Writing pkt to underlying connection")
  91. _, err = PACKET_TYPE_MSG.WriteTo(c.conn)
  92. if err != nil { break }
  93. _, err = sendPkt.WriteTo(c.conn)
  94. case <-c.pong:
  95. _, err = PACKET_TYPE_PONG.WriteTo(c.conn)
  96. case <-c.quit:
  97. break FOR_LOOP
  98. }
  99. if err != nil {
  100. log.Infof("%v failed @ sendHandler:\n%v", c, err)
  101. c.Stop()
  102. break FOR_LOOP
  103. }
  104. c.flush()
  105. }
  106. log.Tracef("%v sendHandler done", c)
  107. // cleanup
  108. }
  109. func (c *Connection) recvHandler(channels map[String]*Channel) {
  110. log.Tracef("%v recvHandler with %v channels", c, len(channels))
  111. // TODO: catch panics & stop connection.
  112. FOR_LOOP:
  113. for {
  114. pktType, err := ReadUInt8Safe(c.conn)
  115. if err != nil {
  116. if atomic.LoadUint32(&c.stopped) != 1 {
  117. log.Infof("%v failed @ recvHandler", c)
  118. c.Stop()
  119. }
  120. break FOR_LOOP
  121. } else {
  122. log.Tracef("Found pktType %v", pktType)
  123. }
  124. switch pktType {
  125. case PACKET_TYPE_PING:
  126. c.pong <- struct{}{}
  127. case PACKET_TYPE_PONG:
  128. // do nothing
  129. case PACKET_TYPE_MSG:
  130. pkt, err := ReadPacketSafe(c.conn)
  131. if err != nil {
  132. if atomic.LoadUint32(&c.stopped) != 1 {
  133. log.Infof("%v failed @ recvHandler", c)
  134. c.Stop()
  135. }
  136. break FOR_LOOP
  137. }
  138. channel := channels[pkt.Channel]
  139. if channel == nil {
  140. Panicf("Unknown channel %v", pkt.Channel)
  141. }
  142. channel.recvQueue <- pkt
  143. default:
  144. Panicf("Unknown message type %v", pktType)
  145. }
  146. c.pingDebouncer.Reset()
  147. }
  148. log.Tracef("%v recvHandler done", c)
  149. // cleanup
  150. close(c.pong)
  151. for _ = range c.pong {
  152. // drain
  153. }
  154. }
  155. /* IOStats */
  156. type IOStats struct {
  157. TimeConnected Time
  158. LastSent Time
  159. LastRecv Time
  160. BytesRecv UInt64
  161. BytesSent UInt64
  162. PktsRecv UInt64
  163. PktsSent UInt64
  164. }