You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

247 lines
5.7 KiB

11 years ago
11 years ago
11 years ago
11 years ago
11 years ago
11 years ago
11 years ago
11 years ago
11 years ago
11 years ago
11 years ago
11 years ago
11 years ago
11 years ago
11 years ago
11 years ago
11 years ago
11 years ago
11 years ago
11 years ago
11 years ago
11 years ago
11 years ago
11 years ago
11 years ago
11 years ago
11 years ago
11 years ago
11 years ago
11 years ago
11 years ago
11 years ago
11 years ago
11 years ago
11 years ago
11 years ago
11 years ago
11 years ago
11 years ago
11 years ago
  1. package p2p
  2. import (
  3. "bufio"
  4. "fmt"
  5. "net"
  6. "sync/atomic"
  7. "time"
  8. . "github.com/tendermint/tendermint/binary"
  9. . "github.com/tendermint/tendermint/common"
  10. )
  11. const (
  12. MinReadBufferSize = 1024
  13. MinWriteBufferSize = 1024
  14. FlushThrottleMS = 50
  15. OutQueueSize = 50
  16. IdleTimeoutMinutes = 5
  17. PingTimeoutMinutes = 2
  18. )
  19. /*
  20. A Connection wraps a network connection and handles buffering and multiplexing.
  21. "Packets" are sent with ".Send(Packet)".
  22. Packets received are sent to channels as commanded by the ".Start(...)" method.
  23. */
  24. type Connection struct {
  25. ioStats IOStats
  26. sendQueue chan Packet // never closes
  27. conn net.Conn
  28. bufReader *bufio.Reader
  29. bufWriter *bufio.Writer
  30. flushThrottler *Throttler
  31. quit chan struct{}
  32. pingRepeatTimer *RepeatTimer
  33. pong chan struct{}
  34. channels map[string]*Channel
  35. onError func(interface{})
  36. started uint32
  37. stopped uint32
  38. errored uint32
  39. }
  40. var (
  41. PacketTypePing = UInt8(0x00)
  42. PacketTypePong = UInt8(0x01)
  43. PacketTypeMessage = UInt8(0x10)
  44. )
  45. func NewConnection(conn net.Conn) *Connection {
  46. return &Connection{
  47. sendQueue: make(chan Packet, OutQueueSize),
  48. conn: conn,
  49. bufReader: bufio.NewReaderSize(conn, MinReadBufferSize),
  50. bufWriter: bufio.NewWriterSize(conn, MinWriteBufferSize),
  51. flushThrottler: NewThrottler(FlushThrottleMS * time.Millisecond),
  52. quit: make(chan struct{}),
  53. pingRepeatTimer: NewRepeatTimer(PingTimeoutMinutes * time.Minute),
  54. pong: make(chan struct{}),
  55. }
  56. }
  57. // .Start() begins multiplexing packets to and from "channels".
  58. // If an error occurs, the recovered reason is passed to "onError".
  59. func (c *Connection) Start(channels map[string]*Channel, onError func(interface{})) {
  60. log.Debugf("Starting %v", c)
  61. if atomic.CompareAndSwapUint32(&c.started, 0, 1) {
  62. c.channels = channels
  63. c.onError = onError
  64. go c.sendHandler()
  65. go c.recvHandler()
  66. }
  67. }
  68. func (c *Connection) Stop() {
  69. if atomic.CompareAndSwapUint32(&c.stopped, 0, 1) {
  70. log.Debugf("Stopping %v", c)
  71. close(c.quit)
  72. c.conn.Close()
  73. c.flushThrottler.Stop()
  74. c.pingRepeatTimer.Stop()
  75. // We can't close pong safely here because
  76. // recvHandler may write to it after we've stopped.
  77. // Though it doesn't need to get closed at all,
  78. // we close it @ recvHandler.
  79. // close(c.pong)
  80. }
  81. }
  82. func (c *Connection) LocalAddress() *NetAddress {
  83. return NewNetAddress(c.conn.LocalAddr())
  84. }
  85. func (c *Connection) RemoteAddress() *NetAddress {
  86. return NewNetAddress(c.conn.RemoteAddr())
  87. }
  88. // Returns true if successfully queued,
  89. // Returns false if connection was closed.
  90. // Blocks.
  91. func (c *Connection) Send(pkt Packet) bool {
  92. select {
  93. case c.sendQueue <- pkt:
  94. return true
  95. case <-c.quit:
  96. return false
  97. }
  98. }
  99. func (c *Connection) String() string {
  100. return fmt.Sprintf("Connection{%v}", c.conn.RemoteAddr())
  101. }
  102. func (c *Connection) flush() {
  103. // TODO: this is pretty naive.
  104. // We end up flushing when we don't have to (yet).
  105. // A better solution might require us implementing our own buffered writer.
  106. err := c.bufWriter.Flush()
  107. if err != nil {
  108. if atomic.LoadUint32(&c.stopped) != 1 {
  109. log.Warnf("Connection flush failed: %v", err)
  110. }
  111. }
  112. }
  113. // Catch panics, usually caused by remote disconnects.
  114. func (c *Connection) _recover() {
  115. if r := recover(); r != nil {
  116. c.Stop()
  117. if atomic.CompareAndSwapUint32(&c.errored, 0, 1) {
  118. if c.onError != nil {
  119. c.onError(r)
  120. }
  121. }
  122. }
  123. }
  124. // sendHandler pulls from .sendQueue and writes to .bufWriter
  125. func (c *Connection) sendHandler() {
  126. log.Tracef("%v sendHandler", c)
  127. defer c._recover()
  128. FOR_LOOP:
  129. for {
  130. var err error
  131. select {
  132. case sendPkt := <-c.sendQueue:
  133. log.Tracef("Found pkt from sendQueue. Writing pkt to underlying connection")
  134. _, err = PacketTypeMessage.WriteTo(c.bufWriter)
  135. if err != nil {
  136. break
  137. }
  138. _, err = sendPkt.WriteTo(c.bufWriter)
  139. c.flushThrottler.Set()
  140. case <-c.flushThrottler.Ch:
  141. c.flush()
  142. case <-c.pingRepeatTimer.Ch:
  143. _, err = PacketTypePing.WriteTo(c.bufWriter)
  144. c.flush()
  145. case <-c.pong:
  146. _, err = PacketTypePong.WriteTo(c.bufWriter)
  147. c.flush()
  148. case <-c.quit:
  149. break FOR_LOOP
  150. }
  151. if atomic.LoadUint32(&c.stopped) == 1 {
  152. break FOR_LOOP
  153. }
  154. if err != nil {
  155. log.Infof("%v failed @ sendHandler:\n%v", c, err)
  156. c.Stop()
  157. break FOR_LOOP
  158. }
  159. }
  160. log.Tracef("%v sendHandler done", c)
  161. // cleanup
  162. }
  163. // recvHandler reads from .bufReader and pushes to the appropriate
  164. // channel's recvQueue.
  165. func (c *Connection) recvHandler() {
  166. log.Tracef("%v recvHandler", c)
  167. defer c._recover()
  168. FOR_LOOP:
  169. for {
  170. pktType, err := ReadUInt8Safe(c.bufReader)
  171. if err != nil {
  172. if atomic.LoadUint32(&c.stopped) != 1 {
  173. log.Infof("%v failed @ recvHandler", c)
  174. c.Stop()
  175. }
  176. break FOR_LOOP
  177. } else {
  178. log.Tracef("Found pktType %v", pktType)
  179. }
  180. switch pktType {
  181. case PacketTypePing:
  182. // TODO: keep track of these, make sure it isn't abused
  183. // as they cause flush()'s in the send buffer.
  184. c.pong <- struct{}{}
  185. case PacketTypePong:
  186. // do nothing
  187. case PacketTypeMessage:
  188. pkt, err := ReadPacketSafe(c.bufReader)
  189. if err != nil {
  190. if atomic.LoadUint32(&c.stopped) != 1 {
  191. log.Infof("%v failed @ recvHandler", c)
  192. c.Stop()
  193. }
  194. break FOR_LOOP
  195. }
  196. channel := c.channels[string(pkt.Channel)]
  197. if channel == nil {
  198. Panicf("Unknown channel %v", pkt.Channel)
  199. }
  200. channel.recvQueue <- pkt
  201. default:
  202. Panicf("Unknown message type %v", pktType)
  203. }
  204. c.pingRepeatTimer.Reset()
  205. }
  206. log.Tracef("%v recvHandler done", c)
  207. // cleanup
  208. close(c.pong)
  209. for _ = range c.pong {
  210. // drain
  211. }
  212. }
  213. /* IOStats */
  214. type IOStats struct {
  215. TimeConnected Time
  216. LastSent Time
  217. LastRecv Time
  218. BytesRecv UInt64
  219. BytesSent UInt64
  220. PktsRecv UInt64
  221. PktsSent UInt64
  222. }