You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

259 lines
6.1 KiB

11 years ago
11 years ago
11 years ago
11 years ago
11 years ago
11 years ago
11 years ago
11 years ago
11 years ago
11 years ago
11 years ago
11 years ago
11 years ago
11 years ago
11 years ago
11 years ago
11 years ago
11 years ago
11 years ago
11 years ago
11 years ago
11 years ago
11 years ago
11 years ago
11 years ago
11 years ago
11 years ago
11 years ago
11 years ago
11 years ago
11 years ago
11 years ago
11 years ago
11 years ago
11 years ago
11 years ago
11 years ago
11 years ago
11 years ago
11 years ago
11 years ago
11 years ago
11 years ago
11 years ago
11 years ago
11 years ago
11 years ago
11 years ago
11 years ago
  1. package p2p
  2. import (
  3. "bufio"
  4. "fmt"
  5. "net"
  6. "sync/atomic"
  7. "time"
  8. "github.com/op/go-logging"
  9. . "github.com/tendermint/tendermint/binary"
  10. . "github.com/tendermint/tendermint/common"
  11. )
  12. const (
  13. minReadBufferSize = 1024
  14. minWriteBufferSize = 1024
  15. flushThrottleMS = 50
  16. outQueueSize = 50
  17. idleTimeoutMinutes = 5
  18. pingTimeoutMinutes = 2
  19. )
  20. /*
  21. A Connection wraps a network connection and handles buffering and multiplexing.
  22. "Packets" are sent with ".Send(Packet)".
  23. Packets received are sent to channels as commanded by the ".Start(...)" method.
  24. */
  25. type Connection struct {
  26. ioStats IOStats
  27. sendQueue chan Packet // never closes
  28. conn net.Conn
  29. bufReader *bufio.Reader
  30. bufWriter *bufio.Writer
  31. flushThrottler *Throttler
  32. quit chan struct{}
  33. pingRepeatTimer *RepeatTimer
  34. pong chan struct{}
  35. channels map[string]*Channel
  36. onError func(interface{})
  37. started uint32
  38. stopped uint32
  39. errored uint32
  40. }
  41. const (
  42. packetTypePing = UInt8(0x00)
  43. packetTypePong = UInt8(0x01)
  44. packetTypeMessage = UInt8(0x10)
  45. )
  46. func NewConnection(conn net.Conn) *Connection {
  47. return &Connection{
  48. sendQueue: make(chan Packet, outQueueSize),
  49. conn: conn,
  50. bufReader: bufio.NewReaderSize(conn, minReadBufferSize),
  51. bufWriter: bufio.NewWriterSize(conn, minWriteBufferSize),
  52. flushThrottler: NewThrottler(flushThrottleMS * time.Millisecond),
  53. quit: make(chan struct{}),
  54. pingRepeatTimer: NewRepeatTimer(pingTimeoutMinutes * time.Minute),
  55. pong: make(chan struct{}),
  56. }
  57. }
  58. // .Start() begins multiplexing packets to and from "channels".
  59. // If an error occurs, the recovered reason is passed to "onError".
  60. func (c *Connection) Start(channels map[string]*Channel, onError func(interface{})) {
  61. if atomic.CompareAndSwapUint32(&c.started, 0, 1) {
  62. log.Debug("Starting %v", c)
  63. c.channels = channels
  64. c.onError = onError
  65. go c.sendHandler()
  66. go c.recvHandler()
  67. }
  68. }
  69. func (c *Connection) Stop() {
  70. if atomic.CompareAndSwapUint32(&c.stopped, 0, 1) {
  71. log.Debug("Stopping %v", c)
  72. close(c.quit)
  73. c.conn.Close()
  74. c.flushThrottler.Stop()
  75. c.pingRepeatTimer.Stop()
  76. // We can't close pong safely here because
  77. // recvHandler may write to it after we've stopped.
  78. // Though it doesn't need to get closed at all,
  79. // we close it @ recvHandler.
  80. // close(c.pong)
  81. }
  82. }
  83. func (c *Connection) LocalAddress() *NetAddress {
  84. return NewNetAddress(c.conn.LocalAddr())
  85. }
  86. func (c *Connection) RemoteAddress() *NetAddress {
  87. return NewNetAddress(c.conn.RemoteAddr())
  88. }
  89. // Returns true if successfully queued,
  90. // Returns false if connection was closed.
  91. // Blocks.
  92. func (c *Connection) Send(pkt Packet) bool {
  93. select {
  94. case c.sendQueue <- pkt:
  95. return true
  96. case <-c.quit:
  97. return false
  98. }
  99. }
  100. func (c *Connection) String() string {
  101. return fmt.Sprintf("Connection{%v}", c.conn.RemoteAddr())
  102. }
  103. func (c *Connection) flush() {
  104. // TODO: this is pretty naive.
  105. // We end up flushing when we don't have to (yet).
  106. // A better solution might require us implementing our own buffered writer.
  107. err := c.bufWriter.Flush()
  108. if err != nil {
  109. if atomic.LoadUint32(&c.stopped) != 1 {
  110. log.Warning("Connection flush failed: %v", err)
  111. }
  112. }
  113. }
  114. // Catch panics, usually caused by remote disconnects.
  115. func (c *Connection) _recover() {
  116. if r := recover(); r != nil {
  117. c.Stop()
  118. if atomic.CompareAndSwapUint32(&c.errored, 0, 1) {
  119. if c.onError != nil {
  120. c.onError(r)
  121. }
  122. }
  123. }
  124. }
  125. // sendHandler pulls from .sendQueue and writes to .bufWriter
  126. func (c *Connection) sendHandler() {
  127. log.Debug("%v sendHandler", c)
  128. defer c._recover()
  129. FOR_LOOP:
  130. for {
  131. var err error
  132. select {
  133. case sendPkt := <-c.sendQueue:
  134. log.Debug("Found pkt from sendQueue. Writing pkt to underlying connection")
  135. _, err = packetTypeMessage.WriteTo(c.bufWriter)
  136. if err != nil {
  137. break
  138. }
  139. _, err = sendPkt.WriteTo(c.bufWriter)
  140. c.flushThrottler.Set()
  141. case <-c.flushThrottler.Ch:
  142. c.flush()
  143. case <-c.pingRepeatTimer.Ch:
  144. _, err = packetTypePing.WriteTo(c.bufWriter)
  145. log.Debug("Send [Ping] -> %v", c)
  146. c.flush()
  147. case <-c.pong:
  148. _, err = packetTypePong.WriteTo(c.bufWriter)
  149. log.Debug("Send [Pong] -> %v", c)
  150. c.flush()
  151. case <-c.quit:
  152. break FOR_LOOP
  153. }
  154. if atomic.LoadUint32(&c.stopped) == 1 {
  155. break FOR_LOOP
  156. }
  157. if err != nil {
  158. log.Info("%v failed @ sendHandler:\n%v", c, err)
  159. c.Stop()
  160. break FOR_LOOP
  161. }
  162. }
  163. log.Debug("%v sendHandler done", c)
  164. // cleanup
  165. }
  166. // recvHandler reads from .bufReader and pushes to the appropriate
  167. // channel's recvQueue.
  168. func (c *Connection) recvHandler() {
  169. log.Debug("%v recvHandler", c)
  170. defer c._recover()
  171. FOR_LOOP:
  172. for {
  173. pktType, err := ReadUInt8Safe(c.bufReader)
  174. if log.IsEnabledFor(logging.DEBUG) {
  175. // peeking into bufReader
  176. numBytes := c.bufReader.Buffered()
  177. bytes, err := c.bufReader.Peek(MinInt(numBytes, 100))
  178. if err != nil {
  179. log.Debug("recvHandler packet type %X, peeked: %X", pktType, bytes)
  180. }
  181. }
  182. if err != nil {
  183. if atomic.LoadUint32(&c.stopped) != 1 {
  184. log.Info("%v failed @ recvHandler with err: %v", c, err)
  185. c.Stop()
  186. }
  187. break FOR_LOOP
  188. } else {
  189. log.Debug("Found pktType %v", pktType)
  190. }
  191. switch pktType {
  192. case packetTypePing:
  193. // TODO: keep track of these, make sure it isn't abused
  194. // as they cause flush()'s in the send buffer.
  195. c.pong <- struct{}{}
  196. case packetTypePong:
  197. // do nothing
  198. log.Debug("[%v] Received Pong", c)
  199. case packetTypeMessage:
  200. pkt, err := ReadPacketSafe(c.bufReader)
  201. if err != nil {
  202. if atomic.LoadUint32(&c.stopped) != 1 {
  203. log.Info("%v failed @ recvHandler", c)
  204. c.Stop()
  205. }
  206. break FOR_LOOP
  207. }
  208. channel := c.channels[string(pkt.Channel)]
  209. if channel == nil {
  210. Panicf("Unknown channel %v", pkt.Channel)
  211. }
  212. channel.recvQueue <- pkt
  213. default:
  214. Panicf("Unknown message type %v", pktType)
  215. }
  216. c.pingRepeatTimer.Reset()
  217. }
  218. log.Debug("%v recvHandler done", c)
  219. // cleanup
  220. close(c.pong)
  221. for _ = range c.pong {
  222. // drain
  223. }
  224. }
  225. /* IOStats */
  226. type IOStats struct {
  227. TimeConnected Time
  228. LastSent Time
  229. LastRecv Time
  230. BytesRecv UInt64
  231. BytesSent UInt64
  232. PktsRecv UInt64
  233. PktsSent UInt64
  234. }