You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

269 lines
5.5 KiB

10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
  1. package p2p
  2. import (
  3. "bytes"
  4. "fmt"
  5. "io"
  6. "sync/atomic"
  7. "time"
  8. . "github.com/tendermint/tendermint/binary"
  9. )
  10. /* Peer */
  11. type Peer struct {
  12. outbound bool
  13. conn *Connection
  14. channels map[string]*Channel
  15. quit chan struct{}
  16. started uint32
  17. stopped uint32
  18. }
  19. func newPeer(conn *Connection, channels map[string]*Channel) *Peer {
  20. return &Peer{
  21. conn: conn,
  22. channels: channels,
  23. quit: make(chan struct{}),
  24. stopped: 0,
  25. }
  26. }
  27. func (p *Peer) start(pktRecvQueues map[string]chan *InboundPacket, onPeerError func(*Peer, interface{})) {
  28. log.Debug("Starting %v", p)
  29. if atomic.CompareAndSwapUint32(&p.started, 0, 1) {
  30. // on connection error
  31. onError := func(r interface{}) {
  32. p.stop()
  33. onPeerError(p, r)
  34. }
  35. p.conn.Start(p.channels, onError)
  36. for chName, _ := range p.channels {
  37. chInQueue := pktRecvQueues[chName]
  38. go p.recvHandler(chName, chInQueue)
  39. go p.sendHandler(chName)
  40. }
  41. }
  42. }
  43. func (p *Peer) stop() {
  44. if atomic.CompareAndSwapUint32(&p.stopped, 0, 1) {
  45. log.Debug("Stopping %v", p)
  46. close(p.quit)
  47. p.conn.Stop()
  48. }
  49. }
  50. func (p *Peer) IsOutbound() bool {
  51. return p.outbound
  52. }
  53. func (p *Peer) LocalAddress() *NetAddress {
  54. return p.conn.LocalAddress()
  55. }
  56. func (p *Peer) RemoteAddress() *NetAddress {
  57. return p.conn.RemoteAddress()
  58. }
  59. func (p *Peer) Channel(chName string) *Channel {
  60. return p.channels[chName]
  61. }
  62. // TrySend returns true if the packet was successfully queued.
  63. // Returning true does not imply that the packet will be sent.
  64. func (p *Peer) TrySend(pkt Packet) bool {
  65. channel := p.Channel(string(pkt.Channel))
  66. sendQueue := channel.sendQueue
  67. if atomic.LoadUint32(&p.stopped) == 1 {
  68. return false
  69. }
  70. select {
  71. case sendQueue <- pkt:
  72. log.Debug("SEND %v: %v", p, pkt)
  73. return true
  74. default: // buffer full
  75. log.Debug("FAIL SEND %v: %v", p, pkt)
  76. return false
  77. }
  78. }
  79. func (p *Peer) Send(pkt Packet) bool {
  80. channel := p.Channel(string(pkt.Channel))
  81. sendQueue := channel.sendQueue
  82. if atomic.LoadUint32(&p.stopped) == 1 {
  83. return false
  84. }
  85. sendQueue <- pkt
  86. log.Debug("SEND %v: %v", p, pkt)
  87. return true
  88. }
  89. func (p *Peer) WriteTo(w io.Writer) (n int64, err error) {
  90. return p.RemoteAddress().WriteTo(w)
  91. }
  92. func (p *Peer) String() string {
  93. if p.outbound {
  94. return fmt.Sprintf("P(->%v)", p.conn)
  95. } else {
  96. return fmt.Sprintf("P(%v->)", p.conn)
  97. }
  98. }
  99. // sendHandler pulls from a channel and pushes to the connection.
  100. // Each channel gets its own sendHandler goroutine;
  101. // Golang's channel implementation handles the scheduling.
  102. func (p *Peer) sendHandler(chName string) {
  103. // log.Debug("%v sendHandler [%v]", p, chName)
  104. channel := p.channels[chName]
  105. sendQueue := channel.sendQueue
  106. FOR_LOOP:
  107. for {
  108. select {
  109. case <-p.quit:
  110. break FOR_LOOP
  111. case pkt := <-sendQueue:
  112. // blocks until the connection is Stop'd,
  113. // which happens when this peer is Stop'd.
  114. p.conn.Send(pkt)
  115. }
  116. }
  117. // log.Debug("%v sendHandler [%v] closed", p, chName)
  118. // Cleanup
  119. }
  120. // recvHandler pulls from a channel and pushes to the given pktRecvQueue.
  121. // Each channel gets its own recvHandler goroutine.
  122. // Many peers have goroutines that push to the same pktRecvQueue.
  123. // Golang's channel implementation handles the scheduling.
  124. func (p *Peer) recvHandler(chName string, pktRecvQueue chan<- *InboundPacket) {
  125. // log.Debug("%v recvHandler [%v]", p, chName)
  126. channel := p.channels[chName]
  127. recvQueue := channel.recvQueue
  128. FOR_LOOP:
  129. for {
  130. select {
  131. case <-p.quit:
  132. break FOR_LOOP
  133. case pkt := <-recvQueue:
  134. // send to pktRecvQueue
  135. inboundPacket := &InboundPacket{
  136. Peer: p,
  137. Time: Time{time.Now()},
  138. Packet: pkt,
  139. }
  140. select {
  141. case <-p.quit:
  142. break FOR_LOOP
  143. case pktRecvQueue <- inboundPacket:
  144. continue
  145. }
  146. }
  147. }
  148. // log.Debug("%v recvHandler [%v] closed", p, chName)
  149. // Cleanup
  150. }
  151. //-----------------------------------------------------------------------------
  152. /* ChannelDescriptor */
  153. type ChannelDescriptor struct {
  154. Name string
  155. SendBufferSize int
  156. RecvBufferSize int
  157. }
  158. /* Channel */
  159. type Channel struct {
  160. name string
  161. recvQueue chan Packet
  162. sendQueue chan Packet
  163. //stats Stats
  164. }
  165. func newChannel(desc ChannelDescriptor) *Channel {
  166. return &Channel{
  167. name: desc.Name,
  168. recvQueue: make(chan Packet, desc.RecvBufferSize),
  169. sendQueue: make(chan Packet, desc.SendBufferSize),
  170. }
  171. }
  172. func (c *Channel) Name() string {
  173. return c.name
  174. }
  175. func (c *Channel) RecvQueue() <-chan Packet {
  176. return c.recvQueue
  177. }
  178. func (c *Channel) SendQueue() chan<- Packet {
  179. return c.sendQueue
  180. }
  181. //-----------------------------------------------------------------------------
  182. /*
  183. Packet encapsulates a ByteSlice on a Channel.
  184. */
  185. type Packet struct {
  186. Channel String
  187. Bytes ByteSlice
  188. // Hash
  189. }
  190. func NewPacket(chName String, msg Binary) Packet {
  191. msgBytes := BinaryBytes(msg)
  192. return Packet{
  193. Channel: chName,
  194. Bytes: msgBytes,
  195. }
  196. }
  197. func (p Packet) WriteTo(w io.Writer) (n int64, err error) {
  198. n, err = WriteOnto(p.Channel, w, n, err)
  199. n, err = WriteOnto(p.Bytes, w, n, err)
  200. return
  201. }
  202. func (p Packet) Reader() io.Reader {
  203. return bytes.NewReader(p.Bytes)
  204. }
  205. func (p Packet) String() string {
  206. return fmt.Sprintf("%v:%X", p.Channel, p.Bytes)
  207. }
  208. func ReadPacketSafe(r io.Reader) (pkt Packet, err error) {
  209. chName, err := ReadStringSafe(r)
  210. if err != nil {
  211. return
  212. }
  213. // TODO: packet length sanity check.
  214. bytes, err := ReadByteSliceSafe(r)
  215. if err != nil {
  216. return
  217. }
  218. return Packet{Channel: chName, Bytes: bytes}, nil
  219. }
  220. /*
  221. InboundPacket extends Packet with fields relevant to inbound packets.
  222. */
  223. type InboundPacket struct {
  224. Peer *Peer
  225. Time Time
  226. Packet
  227. }