You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

237 lines
4.8 KiB

11 years ago
11 years ago
11 years ago
11 years ago
11 years ago
11 years ago
11 years ago
11 years ago
11 years ago
11 years ago
11 years ago
11 years ago
11 years ago
11 years ago
11 years ago
11 years ago
11 years ago
11 years ago
11 years ago
11 years ago
11 years ago
11 years ago
11 years ago
11 years ago
11 years ago
11 years ago
11 years ago
11 years ago
11 years ago
11 years ago
11 years ago
11 years ago
11 years ago
11 years ago
11 years ago
11 years ago
11 years ago
11 years ago
11 years ago
11 years ago
11 years ago
11 years ago
  1. package peer
  2. import (
  3. "fmt"
  4. "io"
  5. "sync/atomic"
  6. "time"
  7. . "github.com/tendermint/tendermint/binary"
  8. )
  9. /* Peer */
  10. type Peer struct {
  11. outgoing bool
  12. conn *Connection
  13. channels map[string]*Channel
  14. quit chan struct{}
  15. started uint32
  16. stopped uint32
  17. }
  18. func newPeer(conn *Connection, channels map[string]*Channel) *Peer {
  19. return &Peer{
  20. conn: conn,
  21. channels: channels,
  22. quit: make(chan struct{}),
  23. stopped: 0,
  24. }
  25. }
  26. func (p *Peer) start(pktRecvQueues map[string]chan *InboundPacket, onPeerError func(*Peer, interface{})) {
  27. log.Debugf("Starting %v", p)
  28. if atomic.CompareAndSwapUint32(&p.started, 0, 1) {
  29. // on connection error
  30. onError := func(r interface{}) {
  31. p.stop()
  32. onPeerError(p, r)
  33. }
  34. p.conn.Start(p.channels, onError)
  35. for chName, _ := range p.channels {
  36. chInQueue := pktRecvQueues[chName]
  37. go p.recvHandler(chName, chInQueue)
  38. go p.sendHandler(chName)
  39. }
  40. }
  41. }
  42. func (p *Peer) stop() {
  43. if atomic.CompareAndSwapUint32(&p.stopped, 0, 1) {
  44. log.Debugf("Stopping %v", p)
  45. close(p.quit)
  46. p.conn.Stop()
  47. }
  48. }
  49. func (p *Peer) LocalAddress() *NetAddress {
  50. return p.conn.LocalAddress()
  51. }
  52. func (p *Peer) RemoteAddress() *NetAddress {
  53. return p.conn.RemoteAddress()
  54. }
  55. func (p *Peer) Channel(chName string) *Channel {
  56. return p.channels[chName]
  57. }
  58. // TryQueue returns true if the packet was successfully queued.
  59. // Returning true does not imply that the packet will be sent.
  60. func (p *Peer) TryQueue(pkt Packet) bool {
  61. channel := p.Channel(string(pkt.Channel))
  62. sendQueue := channel.sendQueue
  63. if atomic.LoadUint32(&p.stopped) == 1 {
  64. return false
  65. }
  66. select {
  67. case sendQueue <- pkt:
  68. return true
  69. default: // buffer full
  70. return false
  71. }
  72. }
  73. func (p *Peer) WriteTo(w io.Writer) (n int64, err error) {
  74. return p.RemoteAddress().WriteTo(w)
  75. }
  76. func (p *Peer) String() string {
  77. return fmt.Sprintf("Peer{%v-%v,o:%v}", p.LocalAddress(), p.RemoteAddress(), p.outgoing)
  78. }
  79. // sendHandler pulls from a channel and pushes to the connection.
  80. // Each channel gets its own sendHandler goroutine;
  81. // Golang's channel implementation handles the scheduling.
  82. func (p *Peer) sendHandler(chName string) {
  83. log.Tracef("%v sendHandler [%v]", p, chName)
  84. channel := p.channels[chName]
  85. sendQueue := channel.sendQueue
  86. FOR_LOOP:
  87. for {
  88. select {
  89. case <-p.quit:
  90. break FOR_LOOP
  91. case pkt := <-sendQueue:
  92. log.Tracef("Sending packet to peer sendQueue")
  93. // blocks until the connection is Stop'd,
  94. // which happens when this peer is Stop'd.
  95. p.conn.Send(pkt)
  96. }
  97. }
  98. log.Tracef("%v sendHandler [%v] closed", p, chName)
  99. // cleanup
  100. // (none)
  101. }
  102. // recvHandler pulls from a channel and pushes to the given pktRecvQueue.
  103. // Each channel gets its own recvHandler goroutine.
  104. // Many peers have goroutines that push to the same pktRecvQueue.
  105. // Golang's channel implementation handles the scheduling.
  106. func (p *Peer) recvHandler(chName string, pktRecvQueue chan<- *InboundPacket) {
  107. log.Tracef("%v recvHandler [%v]", p, chName)
  108. channel := p.channels[chName]
  109. recvQueue := channel.recvQueue
  110. FOR_LOOP:
  111. for {
  112. select {
  113. case <-p.quit:
  114. break FOR_LOOP
  115. case pkt := <-recvQueue:
  116. // send to pktRecvQueue
  117. inboundPacket := &InboundPacket{
  118. Peer: p,
  119. Time: Time{time.Now()},
  120. Packet: pkt,
  121. }
  122. select {
  123. case <-p.quit:
  124. break FOR_LOOP
  125. case pktRecvQueue <- inboundPacket:
  126. continue
  127. }
  128. }
  129. }
  130. log.Tracef("%v recvHandler [%v] closed", p, chName)
  131. // cleanup
  132. // (none)
  133. }
  134. /* ChannelDescriptor */
  135. type ChannelDescriptor struct {
  136. Name string
  137. SendBufferSize int
  138. RecvBufferSize int
  139. }
  140. /* Channel */
  141. type Channel struct {
  142. name string
  143. recvQueue chan Packet
  144. sendQueue chan Packet
  145. //stats Stats
  146. }
  147. func newChannel(desc ChannelDescriptor) *Channel {
  148. return &Channel{
  149. name: desc.Name,
  150. recvQueue: make(chan Packet, desc.RecvBufferSize),
  151. sendQueue: make(chan Packet, desc.SendBufferSize),
  152. }
  153. }
  154. func (c *Channel) Name() string {
  155. return c.name
  156. }
  157. func (c *Channel) RecvQueue() <-chan Packet {
  158. return c.recvQueue
  159. }
  160. func (c *Channel) SendQueue() chan<- Packet {
  161. return c.sendQueue
  162. }
  163. /* Packet */
  164. /*
  165. Packet encapsulates a ByteSlice on a Channel.
  166. */
  167. type Packet struct {
  168. Channel String
  169. Bytes ByteSlice
  170. // Hash
  171. }
  172. func NewPacket(chName String, bytes ByteSlice) Packet {
  173. return Packet{
  174. Channel: chName,
  175. Bytes: bytes,
  176. }
  177. }
  178. func (p Packet) WriteTo(w io.Writer) (n int64, err error) {
  179. n, err = WriteOnto(p.Channel, w, n, err)
  180. n, err = WriteOnto(p.Bytes, w, n, err)
  181. return
  182. }
  183. func ReadPacketSafe(r io.Reader) (pkt Packet, err error) {
  184. chName, err := ReadStringSafe(r)
  185. if err != nil {
  186. return
  187. }
  188. // TODO: packet length sanity check.
  189. bytes, err := ReadByteSliceSafe(r)
  190. if err != nil {
  191. return
  192. }
  193. return NewPacket(chName, bytes), nil
  194. }
  195. /*
  196. InboundPacket extends Packet with fields relevant to incoming packets.
  197. */
  198. type InboundPacket struct {
  199. Peer *Peer
  200. Time Time
  201. Packet
  202. }