You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

241 lines
5.4 KiB

11 years ago
11 years ago
11 years ago
11 years ago
11 years ago
11 years ago
11 years ago
11 years ago
11 years ago
11 years ago
11 years ago
11 years ago
11 years ago
11 years ago
11 years ago
11 years ago
11 years ago
  1. package p2p
  2. import (
  3. "errors"
  4. "sync/atomic"
  5. "time"
  6. . "github.com/tendermint/tendermint/common"
  7. )
  8. /*
  9. All communication amongst peers are multiplexed by "channels".
  10. (Not the same as Go "channels")
  11. To send a message, encapsulate it into a "Packet" and send it to each peer.
  12. You can find all connected and active peers by iterating over ".Peers().List()".
  13. ".Broadcast()" is provided for convenience, but by iterating over
  14. the peers manually the caller can decide which subset receives a message.
  15. Inbound messages are received by calling ".Receive()".
  16. */
  17. type Switch struct {
  18. channels []ChannelDescriptor
  19. pktRecvQueues map[string]chan *InboundPacket
  20. peers *PeerSet
  21. dialing *CMap
  22. listeners *CMap // name -> chan interface{}
  23. quit chan struct{}
  24. started uint32
  25. stopped uint32
  26. }
  27. var (
  28. ErrSwitchStopped = errors.New("Switch already stopped")
  29. ErrSwitchDuplicatePeer = errors.New("Duplicate peer")
  30. )
  31. const (
  32. peerDialTimeoutSeconds = 30
  33. )
  34. func NewSwitch(channels []ChannelDescriptor) *Switch {
  35. // make pktRecvQueues...
  36. pktRecvQueues := make(map[string]chan *InboundPacket)
  37. for _, chDesc := range channels {
  38. pktRecvQueues[chDesc.Name] = make(chan *InboundPacket)
  39. }
  40. s := &Switch{
  41. channels: channels,
  42. pktRecvQueues: pktRecvQueues,
  43. peers: NewPeerSet(),
  44. dialing: NewCMap(),
  45. listeners: NewCMap(),
  46. quit: make(chan struct{}),
  47. stopped: 0,
  48. }
  49. return s
  50. }
  51. func (s *Switch) Start() {
  52. if atomic.CompareAndSwapUint32(&s.started, 0, 1) {
  53. log.Info("Starting switch")
  54. }
  55. }
  56. func (s *Switch) Stop() {
  57. if atomic.CompareAndSwapUint32(&s.stopped, 0, 1) {
  58. log.Info("Stopping switch")
  59. close(s.quit)
  60. // stop each peer.
  61. for _, peer := range s.peers.List() {
  62. peer.stop()
  63. }
  64. // empty tree.
  65. s.peers = NewPeerSet()
  66. }
  67. }
  68. func (s *Switch) AddPeerWithConnection(conn *Connection, outbound bool) (*Peer, error) {
  69. if atomic.LoadUint32(&s.stopped) == 1 {
  70. return nil, ErrSwitchStopped
  71. }
  72. log.Info("Adding peer with connection: %v, outbound: %v", conn, outbound)
  73. // Create channels for peer
  74. channels := map[string]*Channel{}
  75. for _, chDesc := range s.channels {
  76. channels[chDesc.Name] = newChannel(chDesc)
  77. }
  78. peer := newPeer(conn, channels)
  79. peer.outbound = outbound
  80. // Add the peer to .peers
  81. if s.peers.Add(peer) {
  82. log.Debug("Adding: %v", peer)
  83. } else {
  84. log.Info("Ignoring duplicate: %v", peer)
  85. return nil, ErrSwitchDuplicatePeer
  86. }
  87. // Start the peer
  88. go peer.start(s.pktRecvQueues, s.StopPeerForError)
  89. // Notify listeners.
  90. s.emit(SwitchEventNewPeer{Peer: peer})
  91. return peer, nil
  92. }
  93. func (s *Switch) DialPeerWithAddress(addr *NetAddress) (*Peer, error) {
  94. if atomic.LoadUint32(&s.stopped) == 1 {
  95. return nil, ErrSwitchStopped
  96. }
  97. log.Info("Dialing peer @ %v", addr)
  98. s.dialing.Set(addr.String(), addr)
  99. conn, err := addr.DialTimeout(peerDialTimeoutSeconds * time.Second)
  100. s.dialing.Delete(addr.String())
  101. if err != nil {
  102. return nil, err
  103. }
  104. peer, err := s.AddPeerWithConnection(conn, true)
  105. if err != nil {
  106. return nil, err
  107. }
  108. return peer, nil
  109. }
  110. func (s *Switch) IsDialing(addr *NetAddress) bool {
  111. return s.dialing.Has(addr.String())
  112. }
  113. func (s *Switch) Broadcast(pkt Packet) (numSuccess, numFailure int) {
  114. if atomic.LoadUint32(&s.stopped) == 1 {
  115. return
  116. }
  117. log.Debug("Broadcast on [%v] len: %v", pkt.Channel, len(pkt.Bytes))
  118. for _, peer := range s.peers.List() {
  119. success := peer.TrySend(pkt)
  120. log.Debug("Broadcast for peer %v success: %v", peer, success)
  121. if success {
  122. numSuccess += 1
  123. } else {
  124. numFailure += 1
  125. }
  126. }
  127. return
  128. }
  129. // The events are of type SwitchEvent* defined below.
  130. // Switch does not close these listeners.
  131. func (s *Switch) AddEventListener(name string, listener chan<- interface{}) {
  132. s.listeners.Set(name, listener)
  133. }
  134. func (s *Switch) RemoveEventListener(name string) {
  135. s.listeners.Delete(name)
  136. }
  137. /*
  138. Receive blocks on a channel until a message is found.
  139. */
  140. func (s *Switch) Receive(chName string) *InboundPacket {
  141. if atomic.LoadUint32(&s.stopped) == 1 {
  142. return nil
  143. }
  144. q := s.pktRecvQueues[chName]
  145. if q == nil {
  146. Panicf("Expected pktRecvQueues[%f], found none", chName)
  147. }
  148. select {
  149. case <-s.quit:
  150. return nil
  151. case inPacket := <-q:
  152. log.Debug("Received packet on [%v]", chName)
  153. return inPacket
  154. }
  155. }
  156. // Returns the count of outbound/inbound and outbound-dialing peers.
  157. func (s *Switch) NumPeers() (outbound, inbound, dialing int) {
  158. peers := s.peers.List()
  159. for _, peer := range peers {
  160. if peer.outbound {
  161. outbound++
  162. } else {
  163. inbound++
  164. }
  165. }
  166. dialing = s.dialing.Size()
  167. return
  168. }
  169. func (s *Switch) Peers() IPeerSet {
  170. return s.peers
  171. }
  172. // Disconnect from a peer due to external error.
  173. // TODO: make record depending on reason.
  174. func (s *Switch) StopPeerForError(peer *Peer, reason interface{}) {
  175. log.Info("%v errored: %v", peer, reason)
  176. s.peers.Remove(peer)
  177. peer.stop()
  178. // Notify listeners
  179. s.emit(SwitchEventDonePeer{Peer: peer, Error: reason})
  180. }
  181. // Disconnect from a peer gracefully.
  182. // TODO: handle graceful disconnects.
  183. func (s *Switch) StopPeerGracefully(peer *Peer) {
  184. s.peers.Remove(peer)
  185. peer.stop()
  186. // Notify listeners
  187. s.emit(SwitchEventDonePeer{Peer: peer})
  188. }
  189. func (s *Switch) emit(event interface{}) {
  190. for _, ch_i := range s.listeners.Values() {
  191. ch := ch_i.(chan<- interface{})
  192. ch <- event
  193. }
  194. }
  195. //-----------------------------------------------------------------------------
  196. type SwitchEventNewPeer struct {
  197. Peer *Peer
  198. }
  199. type SwitchEventDonePeer struct {
  200. Peer *Peer
  201. Error interface{}
  202. }