You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

249 lines
5.8 KiB

11 years ago
11 years ago
11 years ago
11 years ago
11 years ago
11 years ago
11 years ago
11 years ago
11 years ago
11 years ago
11 years ago
11 years ago
11 years ago
11 years ago
11 years ago
11 years ago
11 years ago
11 years ago
11 years ago
11 years ago
11 years ago
11 years ago
11 years ago
11 years ago
11 years ago
11 years ago
11 years ago
11 years ago
11 years ago
  1. package peer
  2. import (
  3. "bufio"
  4. "fmt"
  5. "net"
  6. "sync/atomic"
  7. "time"
  8. . "github.com/tendermint/tendermint/binary"
  9. . "github.com/tendermint/tendermint/common"
  10. )
  11. const (
  12. READ_BUFFER_MIN_SIZE = 1024
  13. WRITE_BUFFER_MIN_SIZE = 1024
  14. FLUSH_THROTTLE_MS = 50
  15. OUT_QUEUE_SIZE = 50
  16. IDLE_TIMEOUT_MINUTES = 5
  17. PING_TIMEOUT_MINUTES = 2
  18. )
  19. // BUG(jae): Handle disconnects.
  20. /*
  21. A Connection wraps a network connection and handles buffering and multiplexing.
  22. "Packets" are sent with ".Send(Packet)".
  23. Packets received are sent to channels as commanded by the ".Start(...)" method.
  24. */
  25. type Connection struct {
  26. ioStats IOStats
  27. sendQueue chan Packet // never closes
  28. conn net.Conn
  29. bufReader *bufio.Reader
  30. bufWriter *bufio.Writer
  31. flushThrottler *Throttler
  32. quit chan struct{}
  33. pingRepeatTimer *RepeatTimer
  34. pong chan struct{}
  35. channels map[String]*Channel
  36. onError func(interface{})
  37. started uint32
  38. stopped uint32
  39. errored uint32
  40. }
  41. var (
  42. PACKET_TYPE_PING = UInt8(0x00)
  43. PACKET_TYPE_PONG = UInt8(0x01)
  44. PACKET_TYPE_MSG = UInt8(0x10)
  45. )
  46. func NewConnection(conn net.Conn) *Connection {
  47. return &Connection{
  48. sendQueue: make(chan Packet, OUT_QUEUE_SIZE),
  49. conn: conn,
  50. bufReader: bufio.NewReaderSize(conn, READ_BUFFER_MIN_SIZE),
  51. bufWriter: bufio.NewWriterSize(conn, WRITE_BUFFER_MIN_SIZE),
  52. flushThrottler: NewThrottler(FLUSH_THROTTLE_MS * time.Millisecond),
  53. quit: make(chan struct{}),
  54. pingRepeatTimer: NewRepeatTimer(PING_TIMEOUT_MINUTES * time.Minute),
  55. pong: make(chan struct{}),
  56. }
  57. }
  58. // .Start() begins multiplexing packets to and from "channels".
  59. // If an error occurs, the recovered reason is passed to "onError".
  60. func (c *Connection) Start(channels map[String]*Channel, onError func(interface{})) {
  61. log.Debugf("Starting %v", c)
  62. if atomic.CompareAndSwapUint32(&c.started, 0, 1) {
  63. c.channels = channels
  64. c.onError = onError
  65. go c.sendHandler()
  66. go c.recvHandler()
  67. }
  68. }
  69. func (c *Connection) Stop() {
  70. if atomic.CompareAndSwapUint32(&c.stopped, 0, 1) {
  71. log.Debugf("Stopping %v", c)
  72. close(c.quit)
  73. c.conn.Close()
  74. c.flushThrottler.Stop()
  75. c.pingRepeatTimer.Stop()
  76. // We can't close pong safely here because
  77. // recvHandler may write to it after we've stopped.
  78. // Though it doesn't need to get closed at all,
  79. // we close it @ recvHandler.
  80. // close(c.pong)
  81. }
  82. }
  83. func (c *Connection) LocalAddress() *NetAddress {
  84. return NewNetAddress(c.conn.LocalAddr())
  85. }
  86. func (c *Connection) RemoteAddress() *NetAddress {
  87. return NewNetAddress(c.conn.RemoteAddr())
  88. }
  89. // Returns true if successfully queued,
  90. // Returns false if connection was closed.
  91. // Blocks.
  92. func (c *Connection) Send(pkt Packet) bool {
  93. select {
  94. case c.sendQueue <- pkt:
  95. return true
  96. case <-c.quit:
  97. return false
  98. }
  99. }
  100. func (c *Connection) String() string {
  101. return fmt.Sprintf("Connection{%v}", c.conn.RemoteAddr())
  102. }
  103. func (c *Connection) flush() {
  104. // TODO: this is pretty naive.
  105. // We end up flushing when we don't have to (yet).
  106. // A better solution might require us implementing our own buffered writer.
  107. err := c.bufWriter.Flush()
  108. if err != nil {
  109. if atomic.LoadUint32(&c.stopped) != 1 {
  110. log.Warnf("Connection flush failed: %v", err)
  111. }
  112. }
  113. }
  114. // Catch panics, usually caused by remote disconnects.
  115. func (c *Connection) _recover() {
  116. if r := recover(); r != nil {
  117. c.Stop()
  118. if atomic.CompareAndSwapUint32(&c.errored, 0, 1) {
  119. if c.onError != nil {
  120. c.onError(r)
  121. }
  122. }
  123. }
  124. }
  125. // sendHandler pulls from .sendQueue and writes to .bufWriter
  126. func (c *Connection) sendHandler() {
  127. log.Tracef("%v sendHandler", c)
  128. defer c._recover()
  129. FOR_LOOP:
  130. for {
  131. var err error
  132. select {
  133. case sendPkt := <-c.sendQueue:
  134. log.Tracef("Found pkt from sendQueue. Writing pkt to underlying connection")
  135. _, err = PACKET_TYPE_MSG.WriteTo(c.bufWriter)
  136. if err != nil {
  137. break
  138. }
  139. _, err = sendPkt.WriteTo(c.bufWriter)
  140. c.flushThrottler.Set()
  141. case <-c.flushThrottler.Ch:
  142. c.flush()
  143. case <-c.pingRepeatTimer.Ch:
  144. _, err = PACKET_TYPE_PING.WriteTo(c.bufWriter)
  145. c.flush()
  146. case <-c.pong:
  147. _, err = PACKET_TYPE_PONG.WriteTo(c.bufWriter)
  148. c.flush()
  149. case <-c.quit:
  150. break FOR_LOOP
  151. }
  152. if atomic.LoadUint32(&c.stopped) == 1 {
  153. break FOR_LOOP
  154. }
  155. if err != nil {
  156. log.Infof("%v failed @ sendHandler:\n%v", c, err)
  157. c.Stop()
  158. break FOR_LOOP
  159. }
  160. }
  161. log.Tracef("%v sendHandler done", c)
  162. // cleanup
  163. }
  164. // recvHandler reads from .bufReader and pushes to the appropriate
  165. // channel's recvQueue.
  166. func (c *Connection) recvHandler() {
  167. log.Tracef("%v recvHandler", c)
  168. defer c._recover()
  169. FOR_LOOP:
  170. for {
  171. pktType, err := ReadUInt8Safe(c.bufReader)
  172. if err != nil {
  173. if atomic.LoadUint32(&c.stopped) != 1 {
  174. log.Infof("%v failed @ recvHandler", c)
  175. c.Stop()
  176. }
  177. break FOR_LOOP
  178. } else {
  179. log.Tracef("Found pktType %v", pktType)
  180. }
  181. switch pktType {
  182. case PACKET_TYPE_PING:
  183. // TODO: keep track of these, make sure it isn't abused
  184. // as they cause flush()'s in the send buffer.
  185. c.pong <- struct{}{}
  186. case PACKET_TYPE_PONG:
  187. // do nothing
  188. case PACKET_TYPE_MSG:
  189. pkt, err := ReadPacketSafe(c.bufReader)
  190. if err != nil {
  191. if atomic.LoadUint32(&c.stopped) != 1 {
  192. log.Infof("%v failed @ recvHandler", c)
  193. c.Stop()
  194. }
  195. break FOR_LOOP
  196. }
  197. channel := c.channels[pkt.Channel]
  198. if channel == nil {
  199. Panicf("Unknown channel %v", pkt.Channel)
  200. }
  201. channel.recvQueue <- pkt
  202. default:
  203. Panicf("Unknown message type %v", pktType)
  204. }
  205. c.pingRepeatTimer.Reset()
  206. }
  207. log.Tracef("%v recvHandler done", c)
  208. // cleanup
  209. close(c.pong)
  210. for _ = range c.pong {
  211. // drain
  212. }
  213. }
  214. /* IOStats */
  215. type IOStats struct {
  216. TimeConnected Time
  217. LastSent Time
  218. LastRecv Time
  219. BytesRecv UInt64
  220. BytesSent UInt64
  221. PktsRecv UInt64
  222. PktsSent UInt64
  223. }