You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

209 lines
4.9 KiB

11 years ago
11 years ago
11 years ago
11 years ago
11 years ago
11 years ago
11 years ago
11 years ago
11 years ago
11 years ago
11 years ago
11 years ago
11 years ago
11 years ago
11 years ago
11 years ago
11 years ago
11 years ago
11 years ago
11 years ago
11 years ago
11 years ago
11 years ago
11 years ago
11 years ago
11 years ago
11 years ago
11 years ago
11 years ago
11 years ago
11 years ago
11 years ago
11 years ago
11 years ago
11 years ago
11 years ago
11 years ago
11 years ago
11 years ago
11 years ago
11 years ago
11 years ago
11 years ago
11 years ago
11 years ago
11 years ago
11 years ago
11 years ago
  1. package peer
  2. import (
  3. "errors"
  4. "sync"
  5. "sync/atomic"
  6. . "github.com/tendermint/tendermint/binary"
  7. . "github.com/tendermint/tendermint/common"
  8. "github.com/tendermint/tendermint/merkle"
  9. )
  10. /*
  11. A client is half of a p2p system.
  12. It can reach out to the network and establish connections with other peers.
  13. A client doesn't listen for incoming connections -- that's done by the server.
  14. All communication amongst peers are multiplexed by "channels".
  15. (Not the same as Go "channels")
  16. To send a message, encapsulate it into a "Packet" and send it to each peer.
  17. You can find all connected and active peers by iterating over ".Peers()".
  18. ".Broadcast()" is provided for convenience, but by iterating over
  19. the peers manually the caller can decide which subset receives a message.
  20. Incoming messages are received by calling ".Receive()".
  21. */
  22. type Client struct {
  23. addrBook *AddrBook
  24. targetNumPeers int
  25. makePeerFn func(*Connection) *Peer
  26. self *Peer
  27. pktRecvQueues map[String]chan *InboundPacket
  28. peersMtx sync.Mutex
  29. peers merkle.Tree // addr -> *Peer
  30. quit chan struct{}
  31. stopped uint32
  32. }
  33. var (
  34. ErrClientStopped = errors.New("Client already stopped")
  35. ErrClientDuplicatePeer = errors.New("Duplicate peer")
  36. )
  37. // "makePeerFn" is a factory method for generating new peers from new *Connections.
  38. // "makePeerFn(nil)" must return a prototypical peer that represents the self "peer".
  39. func NewClient(makePeerFn func(*Connection) *Peer) *Client {
  40. self := makePeerFn(nil)
  41. if self == nil {
  42. Panicf("makePeerFn(nil) must return a prototypical peer for self")
  43. }
  44. pktRecvQueues := make(map[String]chan *InboundPacket)
  45. for chName, _ := range self.channels {
  46. pktRecvQueues[chName] = make(chan *InboundPacket)
  47. }
  48. c := &Client{
  49. addrBook: nil, // TODO
  50. targetNumPeers: 0, // TODO
  51. makePeerFn: makePeerFn,
  52. self: self,
  53. pktRecvQueues: pktRecvQueues,
  54. peers: merkle.NewIAVLTree(nil),
  55. quit: make(chan struct{}),
  56. stopped: 0,
  57. }
  58. // automatically start
  59. c.start()
  60. return c
  61. }
  62. func (c *Client) start() {
  63. // Handle PEX messages
  64. // TODO: hmm
  65. // go peerExchangeHandler(c)
  66. }
  67. func (c *Client) Stop() {
  68. log.Infof("Stopping client")
  69. // lock
  70. c.peersMtx.Lock()
  71. if atomic.CompareAndSwapUint32(&c.stopped, 0, 1) {
  72. close(c.quit)
  73. // stop each peer.
  74. for peerValue := range c.peers.Values() {
  75. peer := peerValue.(*Peer)
  76. peer.stop()
  77. }
  78. // empty tree.
  79. c.peers = merkle.NewIAVLTree(nil)
  80. }
  81. c.peersMtx.Unlock()
  82. // unlock
  83. }
  84. func (c *Client) AddPeerWithConnection(conn *Connection, outgoing bool) (*Peer, error) {
  85. if atomic.LoadUint32(&c.stopped) == 1 {
  86. return nil, ErrClientStopped
  87. }
  88. log.Infof("Adding peer with connection: %v, outgoing: %v", conn, outgoing)
  89. peer := c.makePeerFn(conn)
  90. peer.outgoing = outgoing
  91. err := c.addPeer(peer)
  92. if err != nil {
  93. return nil, err
  94. }
  95. go peer.start(c.pktRecvQueues, c.StopPeerForError)
  96. return peer, nil
  97. }
  98. func (c *Client) Broadcast(pkt Packet) (numSuccess, numFailure int) {
  99. if atomic.LoadUint32(&c.stopped) == 1 {
  100. return
  101. }
  102. log.Tracef("Broadcast on [%v] len: %v", pkt.Channel, len(pkt.Bytes))
  103. for v := range c.peers.Values() {
  104. peer := v.(*Peer)
  105. success := peer.TryQueue(pkt)
  106. log.Tracef("Broadcast for peer %v success: %v", peer, success)
  107. if success {
  108. numSuccess += 1
  109. } else {
  110. numFailure += 1
  111. }
  112. }
  113. return
  114. }
  115. /*
  116. Receive blocks on a channel until a message is found.
  117. */
  118. func (c *Client) Receive(chName String) *InboundPacket {
  119. if atomic.LoadUint32(&c.stopped) == 1 {
  120. return nil
  121. }
  122. log.Tracef("Receive on [%v]", chName)
  123. q := c.pktRecvQueues[chName]
  124. if q == nil {
  125. Panicf("Expected pktRecvQueues[%f], found none", chName)
  126. }
  127. select {
  128. case <-c.quit:
  129. return nil
  130. case inPacket := <-q:
  131. return inPacket
  132. }
  133. }
  134. func (c *Client) Peers() merkle.Tree {
  135. // lock & defer
  136. c.peersMtx.Lock()
  137. defer c.peersMtx.Unlock()
  138. return c.peers.Copy()
  139. // unlock deferred
  140. }
  141. // Disconnect from a peer due to external error.
  142. // TODO: make record depending on reason.
  143. func (c *Client) StopPeerForError(peer *Peer, reason interface{}) {
  144. log.Infof("%v errored: %v", peer, reason)
  145. c.StopPeer(peer, false)
  146. }
  147. // Disconnect from a peer.
  148. // If graceful is true, last message sent is a disconnect message.
  149. // TODO: handle graceful disconnects.
  150. func (c *Client) StopPeer(peer *Peer, graceful bool) {
  151. // lock
  152. c.peersMtx.Lock()
  153. peerValue, _ := c.peers.Remove(peer.RemoteAddress())
  154. c.peersMtx.Unlock()
  155. // unlock
  156. peer_ := peerValue.(*Peer)
  157. if peer_ != nil {
  158. peer_.stop()
  159. }
  160. }
  161. func (c *Client) addPeer(peer *Peer) error {
  162. addr := peer.RemoteAddress()
  163. // lock & defer
  164. c.peersMtx.Lock()
  165. defer c.peersMtx.Unlock()
  166. if c.stopped == 1 {
  167. return ErrClientStopped
  168. }
  169. if !c.peers.Has(addr) {
  170. log.Tracef("Actually putting addr: %v, peer: %v", addr, peer)
  171. c.peers.Put(addr, peer)
  172. return nil
  173. } else {
  174. // ignore duplicate peer for addr.
  175. log.Infof("Ignoring duplicate peer for addr %v", addr)
  176. return ErrClientDuplicatePeer
  177. }
  178. // unlock deferred
  179. }