You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

229 lines
4.6 KiB

10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
  1. package peer
  2. import (
  3. "fmt"
  4. "io"
  5. "sync/atomic"
  6. "time"
  7. . "github.com/tendermint/tendermint/binary"
  8. )
  9. /* Peer */
  10. type Peer struct {
  11. outgoing bool
  12. conn *Connection
  13. channels map[String]*Channel
  14. quit chan struct{}
  15. started uint32
  16. stopped uint32
  17. }
  18. func NewPeer(conn *Connection, channels map[String]*Channel) *Peer {
  19. return &Peer{
  20. conn: conn,
  21. channels: channels,
  22. quit: make(chan struct{}),
  23. stopped: 0,
  24. }
  25. }
  26. func (p *Peer) start(pktRecvQueues map[String]chan *InboundPacket, onPeerError func(*Peer, interface{})) {
  27. log.Debugf("Starting %v", p)
  28. if atomic.CompareAndSwapUint32(&p.started, 0, 1) {
  29. // on connection error
  30. onError := func(r interface{}) {
  31. p.stop()
  32. onPeerError(p, r)
  33. }
  34. p.conn.Start(p.channels, onError)
  35. for chName, _ := range p.channels {
  36. chInQueue := pktRecvQueues[chName]
  37. go p.recvHandler(chName, chInQueue)
  38. go p.sendHandler(chName)
  39. }
  40. }
  41. }
  42. func (p *Peer) stop() {
  43. if atomic.CompareAndSwapUint32(&p.stopped, 0, 1) {
  44. log.Debugf("Stopping %v", p)
  45. close(p.quit)
  46. p.conn.Stop()
  47. }
  48. }
  49. func (p *Peer) LocalAddress() *NetAddress {
  50. return p.conn.LocalAddress()
  51. }
  52. func (p *Peer) RemoteAddress() *NetAddress {
  53. return p.conn.RemoteAddress()
  54. }
  55. func (p *Peer) Channel(chName String) *Channel {
  56. return p.channels[chName]
  57. }
  58. // TryQueue returns true if the packet was successfully queued.
  59. // Returning true does not imply that the packet will be sent.
  60. func (p *Peer) TryQueue(pkt Packet) bool {
  61. channel := p.Channel(pkt.Channel)
  62. sendQueue := channel.sendQueue
  63. if atomic.LoadUint32(&p.stopped) == 1 {
  64. return false
  65. }
  66. select {
  67. case sendQueue <- pkt:
  68. return true
  69. default: // buffer full
  70. return false
  71. }
  72. }
  73. func (p *Peer) WriteTo(w io.Writer) (n int64, err error) {
  74. return p.RemoteAddress().WriteTo(w)
  75. }
  76. func (p *Peer) String() string {
  77. return fmt.Sprintf("Peer{%v-%v,o:%v}", p.LocalAddress(), p.RemoteAddress(), p.outgoing)
  78. }
  79. // sendHandler pulls from a channel and pushes to the connection.
  80. // Each channel gets its own sendHandler goroutine;
  81. // Golang's channel implementation handles the scheduling.
  82. func (p *Peer) sendHandler(chName String) {
  83. log.Tracef("%v sendHandler [%v]", p, chName)
  84. channel := p.channels[chName]
  85. sendQueue := channel.sendQueue
  86. FOR_LOOP:
  87. for {
  88. select {
  89. case <-p.quit:
  90. break FOR_LOOP
  91. case pkt := <-sendQueue:
  92. log.Tracef("Sending packet to peer sendQueue")
  93. // blocks until the connection is Stop'd,
  94. // which happens when this peer is Stop'd.
  95. p.conn.Send(pkt)
  96. }
  97. }
  98. log.Tracef("%v sendHandler [%v] closed", p, chName)
  99. // cleanup
  100. // (none)
  101. }
  102. // recvHandler pulls from a channel and pushes to the given pktRecvQueue.
  103. // Each channel gets its own recvHandler goroutine.
  104. // Many peers have goroutines that push to the same pktRecvQueue.
  105. // Golang's channel implementation handles the scheduling.
  106. func (p *Peer) recvHandler(chName String, pktRecvQueue chan<- *InboundPacket) {
  107. log.Tracef("%v recvHandler [%v]", p, chName)
  108. channel := p.channels[chName]
  109. recvQueue := channel.recvQueue
  110. FOR_LOOP:
  111. for {
  112. select {
  113. case <-p.quit:
  114. break FOR_LOOP
  115. case pkt := <-recvQueue:
  116. // send to pktRecvQueue
  117. inboundPacket := &InboundPacket{
  118. Peer: p,
  119. Time: Time{time.Now()},
  120. Packet: pkt,
  121. }
  122. select {
  123. case <-p.quit:
  124. break FOR_LOOP
  125. case pktRecvQueue <- inboundPacket:
  126. continue
  127. }
  128. }
  129. }
  130. log.Tracef("%v recvHandler [%v] closed", p, chName)
  131. // cleanup
  132. // (none)
  133. }
  134. /* Channel */
  135. type Channel struct {
  136. name String
  137. recvQueue chan Packet
  138. sendQueue chan Packet
  139. //stats Stats
  140. }
  141. func NewChannel(name String, bufferSize int) *Channel {
  142. return &Channel{
  143. name: name,
  144. recvQueue: make(chan Packet, bufferSize),
  145. sendQueue: make(chan Packet, bufferSize),
  146. }
  147. }
  148. func (c *Channel) Name() String {
  149. return c.name
  150. }
  151. func (c *Channel) RecvQueue() <-chan Packet {
  152. return c.recvQueue
  153. }
  154. func (c *Channel) SendQueue() chan<- Packet {
  155. return c.sendQueue
  156. }
  157. /* Packet */
  158. /*
  159. Packet encapsulates a ByteSlice on a Channel.
  160. */
  161. type Packet struct {
  162. Channel String
  163. Bytes ByteSlice
  164. // Hash
  165. }
  166. func NewPacket(chName String, bytes ByteSlice) Packet {
  167. return Packet{
  168. Channel: chName,
  169. Bytes: bytes,
  170. }
  171. }
  172. func (p Packet) WriteTo(w io.Writer) (n int64, err error) {
  173. n, err = WriteOnto(p.Channel, w, n, err)
  174. n, err = WriteOnto(p.Bytes, w, n, err)
  175. return
  176. }
  177. func ReadPacketSafe(r io.Reader) (pkt Packet, err error) {
  178. chName, err := ReadStringSafe(r)
  179. if err != nil {
  180. return
  181. }
  182. // TODO: packet length sanity check.
  183. bytes, err := ReadByteSliceSafe(r)
  184. if err != nil {
  185. return
  186. }
  187. return NewPacket(chName, bytes), nil
  188. }
  189. /*
  190. InboundPacket extends Packet with fields relevant to incoming packets.
  191. */
  192. type InboundPacket struct {
  193. Peer *Peer
  194. Time Time
  195. Packet
  196. }