You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

240 lines
5.4 KiB

10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
  1. package p2p
  2. import (
  3. "errors"
  4. "fmt"
  5. "net"
  6. "sync/atomic"
  7. "time"
  8. . "github.com/tendermint/tendermint/common"
  9. )
  10. type Reactor interface {
  11. Start(sw *Switch)
  12. Stop()
  13. GetChannels() []*ChannelDescriptor
  14. AddPeer(peer *Peer)
  15. RemovePeer(peer *Peer, reason interface{})
  16. Receive(chId byte, peer *Peer, msgBytes []byte)
  17. }
  18. //-----------------------------------------------------------------------------
  19. /*
  20. The `Switch` handles peer connections and exposes an API to receive incoming messages
  21. on `Reactors`. Each `Reactor` is responsible for handling incoming messages of one
  22. or more `Channels`. So while sending outgoing messages is typically performed on the peer,
  23. incoming messages are received on the reactor.
  24. */
  25. type Switch struct {
  26. reactors []Reactor
  27. chDescs []*ChannelDescriptor
  28. reactorsByCh map[byte]Reactor
  29. peers *PeerSet
  30. dialing *CMap
  31. listeners *CMap // listenerName -> chan interface{}
  32. quit chan struct{}
  33. started uint32
  34. stopped uint32
  35. }
  36. var (
  37. ErrSwitchStopped = errors.New("Switch already stopped")
  38. ErrSwitchDuplicatePeer = errors.New("Duplicate peer")
  39. )
  40. const (
  41. peerDialTimeoutSeconds = 30
  42. )
  43. func NewSwitch(reactors []Reactor) *Switch {
  44. // Validate the reactors. no two reactors can share the same channel.
  45. chDescs := []*ChannelDescriptor{}
  46. reactorsByCh := make(map[byte]Reactor)
  47. for _, reactor := range reactors {
  48. reactorChannels := reactor.GetChannels()
  49. for _, chDesc := range reactorChannels {
  50. chId := chDesc.Id
  51. if reactorsByCh[chId] != nil {
  52. panic(fmt.Sprintf("Channel %X has multiple reactors %v & %v", chId, reactorsByCh[chId], reactor))
  53. }
  54. chDescs = append(chDescs, chDesc)
  55. reactorsByCh[chId] = reactor
  56. }
  57. }
  58. sw := &Switch{
  59. reactors: reactors,
  60. chDescs: chDescs,
  61. reactorsByCh: reactorsByCh,
  62. peers: NewPeerSet(),
  63. dialing: NewCMap(),
  64. listeners: NewCMap(),
  65. quit: make(chan struct{}),
  66. stopped: 0,
  67. }
  68. return sw
  69. }
  70. func (sw *Switch) Start() {
  71. if atomic.CompareAndSwapUint32(&sw.started, 0, 1) {
  72. log.Info("Starting Switch")
  73. for _, reactor := range sw.reactors {
  74. reactor.Start(sw)
  75. }
  76. }
  77. }
  78. func (sw *Switch) Stop() {
  79. if atomic.CompareAndSwapUint32(&sw.stopped, 0, 1) {
  80. log.Info("Stopping Switch")
  81. close(sw.quit)
  82. // Stop each peer.
  83. for _, peer := range sw.peers.List() {
  84. peer.stop()
  85. }
  86. sw.peers = NewPeerSet()
  87. // Stop all reactors.
  88. for _, reactor := range sw.reactors {
  89. reactor.Stop()
  90. }
  91. }
  92. }
  93. func (sw *Switch) Reactors() []Reactor {
  94. return sw.reactors
  95. }
  96. func (sw *Switch) AddPeerWithConnection(conn net.Conn, outbound bool) (*Peer, error) {
  97. if atomic.LoadUint32(&sw.stopped) == 1 {
  98. return nil, ErrSwitchStopped
  99. }
  100. peer := newPeer(conn, outbound, sw.reactorsByCh, sw.chDescs, sw.StopPeerForError)
  101. // Add the peer to .peers
  102. if sw.peers.Add(peer) {
  103. log.Info("Added peer", "peer", peer)
  104. } else {
  105. log.Info("Ignoring duplicate peer", "peer", peer)
  106. return nil, ErrSwitchDuplicatePeer
  107. }
  108. // Start the peer
  109. go peer.start()
  110. // Notify listeners.
  111. sw.doAddPeer(peer)
  112. return peer, nil
  113. }
  114. func (sw *Switch) DialPeerWithAddress(addr *NetAddress) (*Peer, error) {
  115. if atomic.LoadUint32(&sw.stopped) == 1 {
  116. return nil, ErrSwitchStopped
  117. }
  118. log.Info("Dialing peer", "address", addr)
  119. sw.dialing.Set(addr.String(), addr)
  120. conn, err := addr.DialTimeout(peerDialTimeoutSeconds * time.Second)
  121. sw.dialing.Delete(addr.String())
  122. if err != nil {
  123. return nil, err
  124. }
  125. peer, err := sw.AddPeerWithConnection(conn, true)
  126. if err != nil {
  127. return nil, err
  128. }
  129. return peer, nil
  130. }
  131. func (sw *Switch) IsDialing(addr *NetAddress) bool {
  132. return sw.dialing.Has(addr.String())
  133. }
  134. // XXX: This is wrong, we can't just ignore failures on TrySend.
  135. func (sw *Switch) Broadcast(chId byte, msg interface{}) (numSuccess, numFailure int) {
  136. if atomic.LoadUint32(&sw.stopped) == 1 {
  137. return
  138. }
  139. log.Debug("Broadcast", "channel", chId, "msg", msg)
  140. for _, peer := range sw.peers.List() {
  141. // XXX XXX Change.
  142. // success := peer.TrySend(chId, msg)
  143. success := peer.Send(chId, msg)
  144. if success {
  145. numSuccess += 1
  146. } else {
  147. numFailure += 1
  148. }
  149. }
  150. return
  151. }
  152. // Returns the count of outbound/inbound and outbound-dialing peers.
  153. func (sw *Switch) NumPeers() (outbound, inbound, dialing int) {
  154. peers := sw.peers.List()
  155. for _, peer := range peers {
  156. if peer.outbound {
  157. outbound++
  158. } else {
  159. inbound++
  160. }
  161. }
  162. dialing = sw.dialing.Size()
  163. return
  164. }
  165. func (sw *Switch) Peers() IPeerSet {
  166. return sw.peers
  167. }
  168. // Disconnect from a peer due to external error.
  169. // TODO: make record depending on reason.
  170. func (sw *Switch) StopPeerForError(peer *Peer, reason interface{}) {
  171. log.Info("Stopping peer for error", "peer", peer, "error", reason)
  172. sw.peers.Remove(peer)
  173. peer.stop()
  174. // Notify listeners
  175. sw.doRemovePeer(peer, reason)
  176. }
  177. // Disconnect from a peer gracefully.
  178. // TODO: handle graceful disconnects.
  179. func (sw *Switch) StopPeerGracefully(peer *Peer) {
  180. log.Info("Stopping peer gracefully")
  181. sw.peers.Remove(peer)
  182. peer.stop()
  183. // Notify listeners
  184. sw.doRemovePeer(peer, nil)
  185. }
  186. func (sw *Switch) doAddPeer(peer *Peer) {
  187. for _, reactor := range sw.reactors {
  188. reactor.AddPeer(peer)
  189. }
  190. }
  191. func (sw *Switch) doRemovePeer(peer *Peer, reason interface{}) {
  192. for _, reactor := range sw.reactors {
  193. reactor.RemovePeer(peer, reason)
  194. }
  195. }
  196. //-----------------------------------------------------------------------------
  197. type SwitchEventNewPeer struct {
  198. Peer *Peer
  199. }
  200. type SwitchEventDonePeer struct {
  201. Peer *Peer
  202. Error interface{}
  203. }