You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

250 lines
5.8 KiB

10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
11 years ago
10 years ago
11 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
11 years ago
10 years ago
11 years ago
11 years ago
10 years ago
10 years ago
10 years ago
10 years ago
  1. package p2p
  2. import (
  3. "errors"
  4. "fmt"
  5. "net"
  6. "time"
  7. . "github.com/tendermint/tendermint2/common"
  8. )
  9. type Reactor interface {
  10. Start(sw *Switch)
  11. Stop()
  12. GetChannels() []*ChannelDescriptor
  13. AddPeer(peer *Peer)
  14. RemovePeer(peer *Peer, reason interface{})
  15. Receive(chId byte, peer *Peer, msgBytes []byte)
  16. }
  17. //-----------------------------------------------------------------------------
  18. /*
  19. The `Switch` handles peer connections and exposes an API to receive incoming messages
  20. on `Reactors`. Each `Reactor` is responsible for handling incoming messages of one
  21. or more `Channels`. So while sending outgoing messages is typically performed on the peer,
  22. incoming messages are received on the reactor.
  23. */
  24. type Switch struct {
  25. network string
  26. reactors map[string]Reactor
  27. chDescs []*ChannelDescriptor
  28. reactorsByCh map[byte]Reactor
  29. peers *PeerSet
  30. dialing *CMap
  31. listeners *CMap // listenerName -> chan interface{}
  32. }
  33. var (
  34. ErrSwitchDuplicatePeer = errors.New("Duplicate peer")
  35. )
  36. const (
  37. peerDialTimeoutSeconds = 3
  38. )
  39. func NewSwitch() *Switch {
  40. sw := &Switch{
  41. network: "",
  42. reactors: make(map[string]Reactor),
  43. chDescs: make([]*ChannelDescriptor, 0),
  44. reactorsByCh: make(map[byte]Reactor),
  45. peers: NewPeerSet(),
  46. dialing: NewCMap(),
  47. listeners: NewCMap(),
  48. }
  49. return sw
  50. }
  51. // Not goroutine safe.
  52. func (sw *Switch) SetNetwork(network string) {
  53. sw.network = network
  54. }
  55. // Not goroutine safe.
  56. func (sw *Switch) AddReactor(name string, reactor Reactor) Reactor {
  57. // Validate the reactor.
  58. // No two reactors can share the same channel.
  59. reactorChannels := reactor.GetChannels()
  60. for _, chDesc := range reactorChannels {
  61. chId := chDesc.Id
  62. if sw.reactorsByCh[chId] != nil {
  63. panic(fmt.Sprintf("Channel %X has multiple reactors %v & %v", chId, sw.reactorsByCh[chId], reactor))
  64. }
  65. sw.chDescs = append(sw.chDescs, chDesc)
  66. sw.reactorsByCh[chId] = reactor
  67. }
  68. sw.reactors[name] = reactor
  69. return reactor
  70. }
  71. func (sw *Switch) Reactor(name string) Reactor {
  72. return sw.reactors[name]
  73. }
  74. // Convenience function
  75. func (sw *Switch) StartReactors() {
  76. for _, reactor := range sw.reactors {
  77. reactor.Start(sw)
  78. }
  79. }
  80. // Convenience function
  81. func (sw *Switch) StopReactors() {
  82. // Stop all reactors.
  83. for _, reactor := range sw.reactors {
  84. reactor.Stop()
  85. }
  86. }
  87. // Convenience function
  88. func (sw *Switch) StopPeers() {
  89. // Stop each peer.
  90. for _, peer := range sw.peers.List() {
  91. peer.stop()
  92. }
  93. sw.peers = NewPeerSet()
  94. }
  95. // Convenience function
  96. func (sw *Switch) Stop() {
  97. sw.StopPeers()
  98. sw.StopReactors()
  99. }
  100. // Not goroutine safe to modify.
  101. func (sw *Switch) Reactors() map[string]Reactor {
  102. return sw.reactors
  103. }
  104. func (sw *Switch) AddPeerWithConnection(conn net.Conn, outbound bool) (*Peer, error) {
  105. peer := newPeer(conn, outbound, sw.reactorsByCh, sw.chDescs, sw.StopPeerForError)
  106. // Add the peer to .peers
  107. if sw.peers.Add(peer) {
  108. log.Info("Added peer", "peer", peer)
  109. } else {
  110. log.Info("Ignoring duplicate peer", "peer", peer)
  111. return nil, ErrSwitchDuplicatePeer
  112. }
  113. // Start the peer
  114. peer.start()
  115. // Notify listeners.
  116. sw.doAddPeer(peer)
  117. // Send handshake
  118. msg := &pexHandshakeMessage{Network: sw.network}
  119. peer.Send(PexChannel, msg)
  120. return peer, nil
  121. }
  122. func (sw *Switch) DialPeerWithAddress(addr *NetAddress) (*Peer, error) {
  123. log.Debug("Dialing address", "address", addr)
  124. sw.dialing.Set(addr.String(), addr)
  125. conn, err := addr.DialTimeout(peerDialTimeoutSeconds * time.Second)
  126. sw.dialing.Delete(addr.String())
  127. if err != nil {
  128. log.Debug("Failed dialing address", "address", addr, "error", err)
  129. return nil, err
  130. }
  131. peer, err := sw.AddPeerWithConnection(conn, true)
  132. if err != nil {
  133. log.Debug("Failed adding peer", "address", addr, "conn", conn, "error", err)
  134. return nil, err
  135. }
  136. log.Info("Dialed and added peer", "address", addr, "peer", peer)
  137. return peer, nil
  138. }
  139. func (sw *Switch) IsDialing(addr *NetAddress) bool {
  140. return sw.dialing.Has(addr.String())
  141. }
  142. // Broadcast runs a go routine for each attempted send, which will block
  143. // trying to send for defaultSendTimeoutSeconds. Returns a channel
  144. // which receives success values for each attempted send (false if times out)
  145. func (sw *Switch) Broadcast(chId byte, msg interface{}) chan bool {
  146. successChan := make(chan bool, len(sw.peers.List()))
  147. log.Debug("Broadcast", "channel", chId, "msg", msg)
  148. for _, peer := range sw.peers.List() {
  149. go func() {
  150. success := peer.Send(chId, msg)
  151. successChan <- success
  152. }()
  153. }
  154. return successChan
  155. }
  156. // Returns the count of outbound/inbound and outbound-dialing peers.
  157. func (sw *Switch) NumPeers() (outbound, inbound, dialing int) {
  158. peers := sw.peers.List()
  159. for _, peer := range peers {
  160. if peer.outbound {
  161. outbound++
  162. } else {
  163. inbound++
  164. }
  165. }
  166. dialing = sw.dialing.Size()
  167. return
  168. }
  169. func (sw *Switch) Peers() IPeerSet {
  170. return sw.peers
  171. }
  172. // Disconnect from a peer due to external error.
  173. // TODO: make record depending on reason.
  174. func (sw *Switch) StopPeerForError(peer *Peer, reason interface{}) {
  175. log.Info("Stopping peer for error", "peer", peer, "error", reason)
  176. sw.peers.Remove(peer)
  177. peer.stop()
  178. // Notify listeners
  179. sw.doRemovePeer(peer, reason)
  180. }
  181. // Disconnect from a peer gracefully.
  182. // TODO: handle graceful disconnects.
  183. func (sw *Switch) StopPeerGracefully(peer *Peer) {
  184. log.Info("Stopping peer gracefully")
  185. sw.peers.Remove(peer)
  186. peer.stop()
  187. // Notify listeners
  188. sw.doRemovePeer(peer, nil)
  189. }
  190. func (sw *Switch) IsListening() bool {
  191. return sw.listeners.Size() > 0
  192. }
  193. func (sw *Switch) doAddPeer(peer *Peer) {
  194. for _, reactor := range sw.reactors {
  195. reactor.AddPeer(peer)
  196. }
  197. }
  198. func (sw *Switch) doRemovePeer(peer *Peer, reason interface{}) {
  199. for _, reactor := range sw.reactors {
  200. reactor.RemovePeer(peer, reason)
  201. }
  202. }
  203. //-----------------------------------------------------------------------------
  204. type SwitchEventNewPeer struct {
  205. Peer *Peer
  206. }
  207. type SwitchEventDonePeer struct {
  208. Peer *Peer
  209. Error interface{}
  210. }