You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

203 lines
4.5 KiB

11 years ago
11 years ago
11 years ago
11 years ago
11 years ago
11 years ago
11 years ago
11 years ago
11 years ago
11 years ago
11 years ago
  1. package p2p
  2. import (
  3. "errors"
  4. "sync/atomic"
  5. "time"
  6. . "github.com/tendermint/tendermint/common"
  7. )
  8. /*
  9. All communication amongst peers are multiplexed by "channels".
  10. (Not the same as Go "channels")
  11. To send a message, encapsulate it into a "Packet" and send it to each peer.
  12. You can find all connected and active peers by iterating over ".Peers().List()".
  13. ".Broadcast()" is provided for convenience, but by iterating over
  14. the peers manually the caller can decide which subset receives a message.
  15. Inbound messages are received by calling ".Receive()".
  16. */
  17. type Switch struct {
  18. channels []ChannelDescriptor
  19. pktRecvQueues map[string]chan *InboundPacket
  20. peers *PeerSet
  21. dialing *CMap
  22. quit chan struct{}
  23. started uint32
  24. stopped uint32
  25. }
  26. var (
  27. ErrSwitchStopped = errors.New("Switch already stopped")
  28. ErrSwitchDuplicatePeer = errors.New("Duplicate peer")
  29. )
  30. const (
  31. peerDialTimeoutSeconds = 30
  32. )
  33. func NewSwitch(channels []ChannelDescriptor) *Switch {
  34. // make pktRecvQueues...
  35. pktRecvQueues := make(map[string]chan *InboundPacket)
  36. for _, chDesc := range channels {
  37. pktRecvQueues[chDesc.Name] = make(chan *InboundPacket)
  38. }
  39. s := &Switch{
  40. channels: channels,
  41. pktRecvQueues: pktRecvQueues,
  42. peers: NewPeerSet(),
  43. dialing: NewCMap(),
  44. quit: make(chan struct{}),
  45. stopped: 0,
  46. }
  47. return s
  48. }
  49. func (s *Switch) Start() {
  50. if atomic.CompareAndSwapUint32(&s.started, 0, 1) {
  51. log.Infof("Starting switch")
  52. }
  53. }
  54. func (s *Switch) Stop() {
  55. if atomic.CompareAndSwapUint32(&s.stopped, 0, 1) {
  56. log.Infof("Stopping switch")
  57. close(s.quit)
  58. // stop each peer.
  59. for _, peer := range s.peers.List() {
  60. peer.stop()
  61. }
  62. // empty tree.
  63. s.peers = NewPeerSet()
  64. }
  65. }
  66. func (s *Switch) AddPeerWithConnection(conn *Connection, outbound bool) (*Peer, error) {
  67. if atomic.LoadUint32(&s.stopped) == 1 {
  68. return nil, ErrSwitchStopped
  69. }
  70. log.Infof("Adding peer with connection: %v, outbound: %v", conn, outbound)
  71. // Create channels for peer
  72. channels := map[string]*Channel{}
  73. for _, chDesc := range s.channels {
  74. channels[chDesc.Name] = newChannel(chDesc)
  75. }
  76. peer := newPeer(conn, channels)
  77. peer.outbound = outbound
  78. err := s.addPeer(peer)
  79. if err != nil {
  80. return nil, err
  81. }
  82. go peer.start(s.pktRecvQueues, s.StopPeerForError)
  83. return peer, nil
  84. }
  85. func (s *Switch) DialPeerWithAddress(addr *NetAddress) (*Peer, error) {
  86. if atomic.LoadUint32(&s.stopped) == 1 {
  87. return nil, ErrSwitchStopped
  88. }
  89. log.Infof("Dialing peer @ %v", addr)
  90. s.dialing.Set(addr.String(), addr)
  91. conn, err := addr.DialTimeout(peerDialTimeoutSeconds * time.Second)
  92. s.dialing.Delete(addr.String())
  93. if err != nil {
  94. return nil, err
  95. }
  96. peer, err := s.AddPeerWithConnection(conn, true)
  97. if err != nil {
  98. return nil, err
  99. }
  100. return peer, nil
  101. }
  102. func (s *Switch) Broadcast(pkt Packet) (numSuccess, numFailure int) {
  103. if atomic.LoadUint32(&s.stopped) == 1 {
  104. return
  105. }
  106. log.Tracef("Broadcast on [%v] len: %v", pkt.Channel, len(pkt.Bytes))
  107. for _, peer := range s.peers.List() {
  108. success := peer.TrySend(pkt)
  109. log.Tracef("Broadcast for peer %v success: %v", peer, success)
  110. if success {
  111. numSuccess += 1
  112. } else {
  113. numFailure += 1
  114. }
  115. }
  116. return
  117. }
  118. /*
  119. Receive blocks on a channel until a message is found.
  120. */
  121. func (s *Switch) Receive(chName string) *InboundPacket {
  122. if atomic.LoadUint32(&s.stopped) == 1 {
  123. return nil
  124. }
  125. log.Tracef("Receive on [%v]", chName)
  126. q := s.pktRecvQueues[chName]
  127. if q == nil {
  128. Panicf("Expected pktRecvQueues[%f], found none", chName)
  129. }
  130. select {
  131. case <-s.quit:
  132. return nil
  133. case inPacket := <-q:
  134. return inPacket
  135. }
  136. }
  137. func (s *Switch) NumOutboundPeers() (count int) {
  138. peers := s.peers.List()
  139. for _, peer := range peers {
  140. if peer.outbound {
  141. count++
  142. }
  143. }
  144. return
  145. }
  146. func (s *Switch) Peers() IPeerSet {
  147. return s.peers
  148. }
  149. // Disconnect from a peer due to external error.
  150. // TODO: make record depending on reason.
  151. func (s *Switch) StopPeerForError(peer *Peer, reason interface{}) {
  152. log.Infof("%v errored: %v", peer, reason)
  153. s.StopPeer(peer, false)
  154. }
  155. // Disconnect from a peer.
  156. // If graceful is true, last message sent is a disconnect message.
  157. // TODO: handle graceful disconnects.
  158. func (s *Switch) StopPeer(peer *Peer, graceful bool) {
  159. s.peers.Remove(peer)
  160. peer.stop()
  161. }
  162. func (s *Switch) addPeer(peer *Peer) error {
  163. if s.stopped == 1 {
  164. return ErrSwitchStopped
  165. }
  166. if s.peers.Add(peer) {
  167. log.Tracef("Adding: %v", peer)
  168. return nil
  169. } else {
  170. // ignore duplicate peer
  171. log.Infof("Ignoring duplicate: %v", peer)
  172. return ErrSwitchDuplicatePeer
  173. }
  174. }