You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

240 lines
5.5 KiB

10 years ago
11 years ago
10 years ago
10 years ago
10 years ago
11 years ago
11 years ago
11 years ago
11 years ago
10 years ago
10 years ago
10 years ago
11 years ago
10 years ago
11 years ago
11 years ago
11 years ago
11 years ago
11 years ago
11 years ago
11 years ago
10 years ago
10 years ago
10 years ago
  1. package p2p
  2. import (
  3. "errors"
  4. "net"
  5. "sync/atomic"
  6. "time"
  7. . "github.com/tendermint/tendermint/binary"
  8. . "github.com/tendermint/tendermint/common"
  9. )
  10. type Reactor interface {
  11. GetChannels() []*ChannelDescriptor
  12. AddPeer(peer *Peer)
  13. RemovePeer(peer *Peer, reason interface{})
  14. Receive(chId byte, peer *Peer, msgBytes []byte)
  15. }
  16. //-----------------------------------------------------------------------------
  17. /*
  18. All communication amongst peers are multiplexed by "channels".
  19. (Not the same as Go "channels")
  20. To send a message, serialize it into a ByteSlice and send it to each peer.
  21. For best performance, re-use the same immutable ByteSlice to each peer.
  22. You can also use a TypedBytes{} struct for convenience.
  23. You can find all connected and active peers by iterating over ".Peers().List()".
  24. ".Broadcast()" is provided for convenience, but by iterating over
  25. the peers manually the caller can decide which subset receives a message.
  26. Inbound messages are received by calling ".Receive()".
  27. The receiver is responsible for decoding the message bytes, which may be preceded
  28. by a single type byte if a TypedBytes{} was used.
  29. */
  30. type Switch struct {
  31. reactors []Reactor
  32. chDescs []*ChannelDescriptor
  33. reactorsByCh map[byte]Reactor
  34. peers *PeerSet
  35. dialing *CMap
  36. listeners *CMap // listenerName -> chan interface{}
  37. quit chan struct{}
  38. started uint32
  39. stopped uint32
  40. }
  41. var (
  42. ErrSwitchStopped = errors.New("Switch already stopped")
  43. ErrSwitchDuplicatePeer = errors.New("Duplicate peer")
  44. )
  45. const (
  46. peerDialTimeoutSeconds = 30
  47. )
  48. func NewSwitch(reactors []Reactor) *Switch {
  49. // Validate the reactors. no two reactors can share the same channel.
  50. chDescs := []*ChannelDescriptor{}
  51. reactorsByCh := make(map[byte]Reactor)
  52. for _, reactor := range reactors {
  53. reactorChannels := reactor.GetChannels()
  54. for _, chDesc := range reactorChannels {
  55. chId := chDesc.Id
  56. if reactorsByCh[chId] != nil {
  57. Panicf("Channel %X has multiple reactors %v & %v", chId, reactorsByCh[chId], reactor)
  58. }
  59. chDescs = append(chDescs, chDesc)
  60. reactorsByCh[chId] = reactor
  61. }
  62. }
  63. s := &Switch{
  64. reactors: reactors,
  65. chDescs: chDescs,
  66. reactorsByCh: reactorsByCh,
  67. peers: NewPeerSet(),
  68. dialing: NewCMap(),
  69. listeners: NewCMap(),
  70. quit: make(chan struct{}),
  71. stopped: 0,
  72. }
  73. return s
  74. }
  75. func (s *Switch) Start() {
  76. if atomic.CompareAndSwapUint32(&s.started, 0, 1) {
  77. log.Info("Starting switch")
  78. }
  79. }
  80. func (s *Switch) Stop() {
  81. if atomic.CompareAndSwapUint32(&s.stopped, 0, 1) {
  82. log.Info("Stopping switch")
  83. close(s.quit)
  84. // stop each peer.
  85. for _, peer := range s.peers.List() {
  86. peer.stop()
  87. }
  88. // empty tree.
  89. s.peers = NewPeerSet()
  90. }
  91. }
  92. func (s *Switch) Reactors() []Reactor {
  93. return s.reactors
  94. }
  95. func (s *Switch) AddPeerWithConnection(conn net.Conn, outbound bool) (*Peer, error) {
  96. if atomic.LoadUint32(&s.stopped) == 1 {
  97. return nil, ErrSwitchStopped
  98. }
  99. peer := newPeer(conn, outbound, s.reactorsByCh, s.chDescs, s.StopPeerForError)
  100. // Add the peer to .peers
  101. if s.peers.Add(peer) {
  102. log.Info("+ %v", peer)
  103. } else {
  104. log.Info("Ignoring duplicate: %v", peer)
  105. return nil, ErrSwitchDuplicatePeer
  106. }
  107. // Start the peer
  108. go peer.start()
  109. // Notify listeners.
  110. s.doAddPeer(peer)
  111. return peer, nil
  112. }
  113. func (s *Switch) DialPeerWithAddress(addr *NetAddress) (*Peer, error) {
  114. if atomic.LoadUint32(&s.stopped) == 1 {
  115. return nil, ErrSwitchStopped
  116. }
  117. log.Info("Dialing peer @ %v", addr)
  118. s.dialing.Set(addr.String(), addr)
  119. conn, err := addr.DialTimeout(peerDialTimeoutSeconds * time.Second)
  120. s.dialing.Delete(addr.String())
  121. if err != nil {
  122. return nil, err
  123. }
  124. peer, err := s.AddPeerWithConnection(conn, true)
  125. if err != nil {
  126. return nil, err
  127. }
  128. return peer, nil
  129. }
  130. func (s *Switch) IsDialing(addr *NetAddress) bool {
  131. return s.dialing.Has(addr.String())
  132. }
  133. // XXX: This is wrong, we can't just ignore failures on TrySend.
  134. func (s *Switch) Broadcast(chId byte, msg Binary) (numSuccess, numFailure int) {
  135. if atomic.LoadUint32(&s.stopped) == 1 {
  136. return
  137. }
  138. log.Debug("Broadcast on [%X]", chId, msg)
  139. for _, peer := range s.peers.List() {
  140. success := peer.TrySend(chId, msg)
  141. log.Debug("Broadcast for peer %v success: %v", peer, success)
  142. if success {
  143. numSuccess += 1
  144. } else {
  145. numFailure += 1
  146. }
  147. }
  148. return
  149. }
  150. // Returns the count of outbound/inbound and outbound-dialing peers.
  151. func (s *Switch) NumPeers() (outbound, inbound, dialing int) {
  152. peers := s.peers.List()
  153. for _, peer := range peers {
  154. if peer.outbound {
  155. outbound++
  156. } else {
  157. inbound++
  158. }
  159. }
  160. dialing = s.dialing.Size()
  161. return
  162. }
  163. func (s *Switch) Peers() IPeerSet {
  164. return s.peers
  165. }
  166. // Disconnect from a peer due to external error.
  167. // TODO: make record depending on reason.
  168. func (s *Switch) StopPeerForError(peer *Peer, reason interface{}) {
  169. log.Info("- %v !! reason: %v", peer, reason)
  170. s.peers.Remove(peer)
  171. peer.stop()
  172. // Notify listeners
  173. s.doRemovePeer(peer, reason)
  174. }
  175. // Disconnect from a peer gracefully.
  176. // TODO: handle graceful disconnects.
  177. func (s *Switch) StopPeerGracefully(peer *Peer) {
  178. log.Info("- %v", peer)
  179. s.peers.Remove(peer)
  180. peer.stop()
  181. // Notify listeners
  182. s.doRemovePeer(peer, nil)
  183. }
  184. func (s *Switch) doAddPeer(peer *Peer) {
  185. for _, reactor := range s.reactors {
  186. reactor.AddPeer(peer)
  187. }
  188. }
  189. func (s *Switch) doRemovePeer(peer *Peer, reason interface{}) {
  190. for _, reactor := range s.reactors {
  191. reactor.RemovePeer(peer, reason)
  192. }
  193. }
  194. //-----------------------------------------------------------------------------
  195. type SwitchEventNewPeer struct {
  196. Peer *Peer
  197. }
  198. type SwitchEventDonePeer struct {
  199. Peer *Peer
  200. Error interface{}
  201. }