You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

159 lines
3.9 KiB

11 years ago
11 years ago
11 years ago
11 years ago
11 years ago
11 years ago
11 years ago
11 years ago
11 years ago
11 years ago
11 years ago
11 years ago
11 years ago
11 years ago
11 years ago
11 years ago
11 years ago
11 years ago
11 years ago
11 years ago
11 years ago
11 years ago
11 years ago
11 years ago
11 years ago
11 years ago
11 years ago
11 years ago
11 years ago
11 years ago
11 years ago
11 years ago
11 years ago
11 years ago
11 years ago
11 years ago
11 years ago
11 years ago
  1. package peer
  2. import (
  3. . "github.com/tendermint/tendermint/common"
  4. . "github.com/tendermint/tendermint/binary"
  5. "github.com/tendermint/tendermint/merkle"
  6. "sync/atomic"
  7. "sync"
  8. "errors"
  9. )
  10. /* Client
  11. A client is half of a p2p system.
  12. It can reach out to the network and establish connections with servers.
  13. A client doesn't listen for incoming connections -- that's done by the server.
  14. newPeerCb is a factory method for generating new peers from new *Connections.
  15. newPeerCb(nil) must return a prototypical peer that represents the self "peer".
  16. XXX what about peer disconnects?
  17. */
  18. type Client struct {
  19. addrBook AddrBook
  20. targetNumPeers int
  21. newPeerCb func(*Connection) *Peer
  22. self *Peer
  23. inQueues map[String]chan *InboundMsg
  24. mtx sync.Mutex
  25. peers merkle.Tree // addr -> *Peer
  26. quit chan struct{}
  27. stopped uint32
  28. }
  29. var (
  30. CLIENT_STOPPED_ERROR = errors.New("Client already stopped")
  31. CLIENT_DUPLICATE_PEER_ERROR = errors.New("Duplicate peer")
  32. )
  33. func NewClient(newPeerCb func(*Connection) *Peer) *Client {
  34. self := newPeerCb(nil)
  35. if self == nil {
  36. Panicf("newPeerCb(nil) must return a prototypical peer for self")
  37. }
  38. inQueues := make(map[String]chan *InboundMsg)
  39. for chName, channel := range self.channels {
  40. inQueues[chName] = make(chan *InboundMsg)
  41. }
  42. c := &Client{
  43. newPeerCb: newPeerCb,
  44. peers: merkle.NewIAVLTree(nil),
  45. self: self,
  46. inQueues: inQueues,
  47. }
  48. return c
  49. }
  50. func (c *Client) Stop() {
  51. // lock
  52. c.mtx.Lock()
  53. if atomic.CompareAndSwapUint32(&c.stopped, 0, 1) {
  54. close(c.quit)
  55. // stop each peer.
  56. for peerValue := range c.peers.Values() {
  57. peer := peerValue.(*Peer)
  58. peer.Stop()
  59. }
  60. // empty tree.
  61. c.peers = merkle.NewIAVLTree(nil)
  62. }
  63. c.mtx.Unlock()
  64. // unlock
  65. }
  66. func (c *Client) AddPeerWithConnection(conn *Connection, outgoing bool) (*Peer, error) {
  67. if atomic.LoadUint32(&c.stopped) == 1 { return nil, CLIENT_STOPPED_ERROR }
  68. peer := c.newPeerCb(conn)
  69. peer.outgoing = outgoing
  70. err := c.addPeer(peer)
  71. if err != nil { return nil, err }
  72. go peer.Start(c.inQueues)
  73. return peer, nil
  74. }
  75. func (c *Client) Broadcast(chName String, msg Msg) {
  76. if atomic.LoadUint32(&c.stopped) == 1 { return }
  77. for v := range c.peersCopy().Values() {
  78. peer := v.(*Peer)
  79. success := peer.TryQueueOut(chName , msg)
  80. if !success {
  81. // TODO: notify the peer
  82. }
  83. }
  84. }
  85. func (c *Client) PopMessage(chName String) *InboundMsg {
  86. if atomic.LoadUint32(&c.stopped) == 1 { return nil }
  87. channel := c.self.Channel(chName)
  88. q := c.inQueues[chName]
  89. if q == nil { Panicf("Expected inQueues[%f], found none", chName) }
  90. for {
  91. select {
  92. case <-c.quit:
  93. return nil
  94. case inMsg := <-q:
  95. // skip if known.
  96. if channel.Has(inMsg.Msg) {
  97. continue
  98. }
  99. return inMsg
  100. }
  101. }
  102. }
  103. func (c *Client) StopPeer(peer *Peer) {
  104. // lock
  105. c.mtx.Lock()
  106. peerValue, _ := c.peers.Remove(peer.RemoteAddress())
  107. c.mtx.Unlock()
  108. // unlock
  109. peer_ := peerValue.(*Peer)
  110. if peer_ != nil {
  111. peer_.Stop()
  112. }
  113. }
  114. func (c *Client) addPeer(peer *Peer) error {
  115. addr := peer.RemoteAddress()
  116. // lock & defer
  117. c.mtx.Lock(); defer c.mtx.Unlock()
  118. if c.stopped == 1 { return CLIENT_STOPPED_ERROR }
  119. if !c.peers.Has(addr) {
  120. c.peers.Put(addr, peer)
  121. return nil
  122. } else {
  123. // ignore duplicate peer for addr.
  124. log.Infof("Ignoring duplicate peer for addr %v", addr)
  125. return CLIENT_DUPLICATE_PEER_ERROR
  126. }
  127. // unlock deferred
  128. }
  129. func (c *Client) peersCopy() merkle.Tree {
  130. // lock & defer
  131. c.mtx.Lock(); defer c.mtx.Unlock()
  132. return c.peers.Copy()
  133. // unlock deferred
  134. }