You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

172 lines
4.3 KiB

10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
  1. package peer
  2. import (
  3. . "github.com/tendermint/tendermint/common"
  4. . "github.com/tendermint/tendermint/binary"
  5. "github.com/tendermint/tendermint/merkle"
  6. "sync/atomic"
  7. "sync"
  8. "errors"
  9. )
  10. /* Client
  11. A client is half of a p2p system.
  12. It can reach out to the network and establish connections with servers.
  13. A client doesn't listen for incoming connections -- that's done by the server.
  14. newPeerCb is a factory method for generating new peers from new *Connections.
  15. newPeerCb(nil) must return a prototypical peer that represents the self "peer".
  16. XXX what about peer disconnects?
  17. */
  18. type Client struct {
  19. addrBook AddrBook
  20. targetNumPeers int
  21. newPeerCb func(*Connection) *Peer
  22. self *Peer
  23. inQueues map[String]chan *InboundMsg
  24. mtx sync.Mutex
  25. peers merkle.Tree // addr -> *Peer
  26. quit chan struct{}
  27. stopped uint32
  28. }
  29. var (
  30. CLIENT_STOPPED_ERROR = errors.New("Client already stopped")
  31. CLIENT_DUPLICATE_PEER_ERROR = errors.New("Duplicate peer")
  32. )
  33. func NewClient(newPeerCb func(*Connection) *Peer) *Client {
  34. self := newPeerCb(nil)
  35. if self == nil {
  36. Panicf("newPeerCb(nil) must return a prototypical peer for self")
  37. }
  38. inQueues := make(map[String]chan *InboundMsg)
  39. for chName, channel := range self.channels {
  40. inQueues[chName] = make(chan *InboundMsg)
  41. }
  42. c := &Client{
  43. newPeerCb: newPeerCb,
  44. peers: merkle.NewIAVLTree(nil),
  45. self: self,
  46. inQueues: inQueues,
  47. }
  48. return c
  49. }
  50. func (c *Client) Stop() {
  51. // lock
  52. c.mtx.Lock()
  53. if atomic.CompareAndSwapUint32(&c.stopped, 0, 1) {
  54. close(c.quit)
  55. // stop each peer.
  56. for peerValue := range c.peers.Values() {
  57. peer := peerValue.(*Peer)
  58. peer.Stop()
  59. }
  60. // empty tree.
  61. c.peers = merkle.NewIAVLTree(nil)
  62. }
  63. c.mtx.Unlock()
  64. // unlock
  65. }
  66. func (c *Client) AddPeerWithConnection(conn *Connection, outgoing bool) (*Peer, error) {
  67. if atomic.LoadUint32(&c.stopped) == 1 { return nil, CLIENT_STOPPED_ERROR }
  68. peer := c.newPeerCb(conn)
  69. peer.outgoing = outgoing
  70. err := c.addPeer(peer)
  71. if err != nil { return nil, err }
  72. go peer.Start(c.inQueues)
  73. return peer, nil
  74. }
  75. func (c *Client) Broadcast(chName String, msg Msg) {
  76. if atomic.LoadUint32(&c.stopped) == 1 { return }
  77. for v := range c.peersCopy().Values() {
  78. peer := v.(*Peer)
  79. success := peer.TryQueueOut(chName , msg)
  80. if !success {
  81. // TODO: notify the peer
  82. }
  83. }
  84. }
  85. func (c *Client) PopMessage(chName String) *InboundMsg {
  86. if atomic.LoadUint32(&c.stopped) == 1 { return nil }
  87. channel := c.self.Channel(chName)
  88. q := c.inQueues[chName]
  89. if q == nil { Panicf("Expected inQueues[%f], found none", chName) }
  90. for {
  91. select {
  92. case <-c.quit:
  93. return nil
  94. case inMsg := <-q:
  95. // skip if known.
  96. if channel.Filter().Has(inMsg.Msg) {
  97. continue
  98. }
  99. return inMsg
  100. }
  101. }
  102. }
  103. // Updates self's filter for a channel & broadcasts it.
  104. // TODO: rename, same name is confusing.
  105. func (c *Client) UpdateFilter(chName String, filter Filter) {
  106. if atomic.LoadUint32(&c.stopped) == 1 { return }
  107. c.self.Channel(chName).UpdateFilter(filter)
  108. c.Broadcast("", &NewFilterMsg{
  109. ChName: chName,
  110. Filter: filter,
  111. })
  112. }
  113. func (c *Client) StopPeer(peer *Peer) {
  114. // lock
  115. c.mtx.Lock()
  116. peerValue, _ := c.peers.Remove(peer.RemoteAddress())
  117. c.mtx.Unlock()
  118. // unlock
  119. peer_ := peerValue.(*Peer)
  120. if peer_ != nil {
  121. peer_.Stop()
  122. }
  123. }
  124. func (c *Client) addPeer(peer *Peer) error {
  125. addr := peer.RemoteAddress()
  126. // lock & defer
  127. c.mtx.Lock(); defer c.mtx.Unlock()
  128. if c.stopped == 1 { return CLIENT_STOPPED_ERROR }
  129. if !c.peers.Has(addr) {
  130. c.peers.Put(addr, peer)
  131. return nil
  132. } else {
  133. // ignore duplicate peer for addr.
  134. log.Infof("Ignoring duplicate peer for addr %v", addr)
  135. return CLIENT_DUPLICATE_PEER_ERROR
  136. }
  137. // unlock deferred
  138. }
  139. func (c *Client) peersCopy() merkle.Tree {
  140. // lock & defer
  141. c.mtx.Lock(); defer c.mtx.Unlock()
  142. return c.peers.Copy()
  143. // unlock deferred
  144. }