You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

243 lines
4.9 KiB

11 years ago
11 years ago
11 years ago
11 years ago
11 years ago
11 years ago
11 years ago
11 years ago
11 years ago
11 years ago
11 years ago
11 years ago
11 years ago
11 years ago
11 years ago
11 years ago
11 years ago
11 years ago
11 years ago
11 years ago
11 years ago
11 years ago
11 years ago
11 years ago
11 years ago
11 years ago
11 years ago
11 years ago
11 years ago
11 years ago
11 years ago
11 years ago
11 years ago
11 years ago
11 years ago
11 years ago
11 years ago
11 years ago
11 years ago
11 years ago
11 years ago
11 years ago
11 years ago
  1. package p2p
  2. import (
  3. "fmt"
  4. "io"
  5. "sync/atomic"
  6. "time"
  7. . "github.com/tendermint/tendermint/binary"
  8. )
  9. /* Peer */
  10. type Peer struct {
  11. outgoing bool
  12. conn *Connection
  13. channels map[string]*Channel
  14. quit chan struct{}
  15. started uint32
  16. stopped uint32
  17. }
  18. func newPeer(conn *Connection, channels map[string]*Channel) *Peer {
  19. return &Peer{
  20. conn: conn,
  21. channels: channels,
  22. quit: make(chan struct{}),
  23. stopped: 0,
  24. }
  25. }
  26. func (p *Peer) start(pktRecvQueues map[string]chan *InboundPacket, onPeerError func(*Peer, interface{})) {
  27. log.Debugf("Starting %v", p)
  28. if atomic.CompareAndSwapUint32(&p.started, 0, 1) {
  29. // on connection error
  30. onError := func(r interface{}) {
  31. p.stop()
  32. onPeerError(p, r)
  33. }
  34. p.conn.Start(p.channels, onError)
  35. for chName, _ := range p.channels {
  36. chInQueue := pktRecvQueues[chName]
  37. go p.recvHandler(chName, chInQueue)
  38. go p.sendHandler(chName)
  39. }
  40. }
  41. }
  42. func (p *Peer) stop() {
  43. if atomic.CompareAndSwapUint32(&p.stopped, 0, 1) {
  44. log.Debugf("Stopping %v", p)
  45. close(p.quit)
  46. p.conn.Stop()
  47. }
  48. }
  49. func (p *Peer) IsOutgoing() bool {
  50. return p.outgoing
  51. }
  52. func (p *Peer) LocalAddress() *NetAddress {
  53. return p.conn.LocalAddress()
  54. }
  55. func (p *Peer) RemoteAddress() *NetAddress {
  56. return p.conn.RemoteAddress()
  57. }
  58. func (p *Peer) Channel(chName string) *Channel {
  59. return p.channels[chName]
  60. }
  61. // TryQueue returns true if the packet was successfully queued.
  62. // Returning true does not imply that the packet will be sent.
  63. func (p *Peer) TryQueue(pkt Packet) bool {
  64. channel := p.Channel(string(pkt.Channel))
  65. sendQueue := channel.sendQueue
  66. if atomic.LoadUint32(&p.stopped) == 1 {
  67. return false
  68. }
  69. sendQueue <- pkt
  70. return true
  71. select {
  72. case sendQueue <- pkt:
  73. return true
  74. default: // buffer full
  75. return false
  76. }
  77. }
  78. func (p *Peer) WriteTo(w io.Writer) (n int64, err error) {
  79. return p.RemoteAddress().WriteTo(w)
  80. }
  81. func (p *Peer) String() string {
  82. return fmt.Sprintf("Peer{%v-%v,o:%v}", p.LocalAddress(), p.RemoteAddress(), p.outgoing)
  83. }
  84. // sendHandler pulls from a channel and pushes to the connection.
  85. // Each channel gets its own sendHandler goroutine;
  86. // Golang's channel implementation handles the scheduling.
  87. func (p *Peer) sendHandler(chName string) {
  88. log.Tracef("%v sendHandler [%v]", p, chName)
  89. channel := p.channels[chName]
  90. sendQueue := channel.sendQueue
  91. FOR_LOOP:
  92. for {
  93. select {
  94. case <-p.quit:
  95. break FOR_LOOP
  96. case pkt := <-sendQueue:
  97. log.Tracef("Sending packet to peer sendQueue")
  98. // blocks until the connection is Stop'd,
  99. // which happens when this peer is Stop'd.
  100. p.conn.Send(pkt)
  101. }
  102. }
  103. log.Tracef("%v sendHandler [%v] closed", p, chName)
  104. // cleanup
  105. // (none)
  106. }
  107. // recvHandler pulls from a channel and pushes to the given pktRecvQueue.
  108. // Each channel gets its own recvHandler goroutine.
  109. // Many peers have goroutines that push to the same pktRecvQueue.
  110. // Golang's channel implementation handles the scheduling.
  111. func (p *Peer) recvHandler(chName string, pktRecvQueue chan<- *InboundPacket) {
  112. log.Tracef("%v recvHandler [%v]", p, chName)
  113. channel := p.channels[chName]
  114. recvQueue := channel.recvQueue
  115. FOR_LOOP:
  116. for {
  117. select {
  118. case <-p.quit:
  119. break FOR_LOOP
  120. case pkt := <-recvQueue:
  121. // send to pktRecvQueue
  122. inboundPacket := &InboundPacket{
  123. Peer: p,
  124. Time: Time{time.Now()},
  125. Packet: pkt,
  126. }
  127. select {
  128. case <-p.quit:
  129. break FOR_LOOP
  130. case pktRecvQueue <- inboundPacket:
  131. continue
  132. }
  133. }
  134. }
  135. log.Tracef("%v recvHandler [%v] closed", p, chName)
  136. // cleanup
  137. // (none)
  138. }
  139. /* ChannelDescriptor */
  140. type ChannelDescriptor struct {
  141. Name string
  142. SendBufferSize int
  143. RecvBufferSize int
  144. }
  145. /* Channel */
  146. type Channel struct {
  147. name string
  148. recvQueue chan Packet
  149. sendQueue chan Packet
  150. //stats Stats
  151. }
  152. func newChannel(desc ChannelDescriptor) *Channel {
  153. return &Channel{
  154. name: desc.Name,
  155. recvQueue: make(chan Packet, desc.RecvBufferSize),
  156. sendQueue: make(chan Packet, desc.SendBufferSize),
  157. }
  158. }
  159. func (c *Channel) Name() string {
  160. return c.name
  161. }
  162. func (c *Channel) RecvQueue() <-chan Packet {
  163. return c.recvQueue
  164. }
  165. func (c *Channel) SendQueue() chan<- Packet {
  166. return c.sendQueue
  167. }
  168. /* Packet */
  169. /*
  170. Packet encapsulates a ByteSlice on a Channel.
  171. */
  172. type Packet struct {
  173. Channel String
  174. Bytes ByteSlice
  175. // Hash
  176. }
  177. func NewPacket(chName String, bytes ByteSlice) Packet {
  178. return Packet{
  179. Channel: chName,
  180. Bytes: bytes,
  181. }
  182. }
  183. func (p Packet) WriteTo(w io.Writer) (n int64, err error) {
  184. n, err = WriteOnto(p.Channel, w, n, err)
  185. n, err = WriteOnto(p.Bytes, w, n, err)
  186. return
  187. }
  188. func ReadPacketSafe(r io.Reader) (pkt Packet, err error) {
  189. chName, err := ReadStringSafe(r)
  190. if err != nil {
  191. return
  192. }
  193. // TODO: packet length sanity check.
  194. bytes, err := ReadByteSliceSafe(r)
  195. if err != nil {
  196. return
  197. }
  198. return NewPacket(chName, bytes), nil
  199. }
  200. /*
  201. InboundPacket extends Packet with fields relevant to incoming packets.
  202. */
  203. type InboundPacket struct {
  204. Peer *Peer
  205. Time Time
  206. Packet
  207. }