You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

285 lines
6.5 KiB

10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
11 years ago
10 years ago
11 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
11 years ago
11 years ago
11 years ago
11 years ago
11 years ago
11 years ago
10 years ago
10 years ago
10 years ago
10 years ago
  1. package p2p
  2. import (
  3. "errors"
  4. "fmt"
  5. "net"
  6. "time"
  7. . "github.com/tendermint/tendermint/common"
  8. )
  9. type Reactor interface {
  10. Start(sw *Switch)
  11. Stop()
  12. GetChannels() []*ChannelDescriptor
  13. AddPeer(peer *Peer)
  14. RemovePeer(peer *Peer, reason interface{})
  15. Receive(chId byte, peer *Peer, msgBytes []byte)
  16. }
  17. //-----------------------------------------------------------------------------
  18. /*
  19. The `Switch` handles peer connections and exposes an API to receive incoming messages
  20. on `Reactors`. Each `Reactor` is responsible for handling incoming messages of one
  21. or more `Channels`. So while sending outgoing messages is typically performed on the peer,
  22. incoming messages are received on the reactor.
  23. */
  24. type Switch struct {
  25. network string
  26. listeners []Listener
  27. reactors map[string]Reactor
  28. chDescs []*ChannelDescriptor
  29. reactorsByCh map[byte]Reactor
  30. peers *PeerSet
  31. dialing *CMap
  32. }
  33. var (
  34. ErrSwitchDuplicatePeer = errors.New("Duplicate peer")
  35. )
  36. const (
  37. peerDialTimeoutSeconds = 3
  38. )
  39. func NewSwitch() *Switch {
  40. sw := &Switch{
  41. network: "",
  42. reactors: make(map[string]Reactor),
  43. chDescs: make([]*ChannelDescriptor, 0),
  44. reactorsByCh: make(map[byte]Reactor),
  45. peers: NewPeerSet(),
  46. dialing: NewCMap(),
  47. }
  48. return sw
  49. }
  50. // Not goroutine safe.
  51. func (sw *Switch) SetNetwork(network string) {
  52. sw.network = network
  53. }
  54. // Not goroutine safe.
  55. func (sw *Switch) AddReactor(name string, reactor Reactor) Reactor {
  56. // Validate the reactor.
  57. // No two reactors can share the same channel.
  58. reactorChannels := reactor.GetChannels()
  59. for _, chDesc := range reactorChannels {
  60. chId := chDesc.Id
  61. if sw.reactorsByCh[chId] != nil {
  62. panic(fmt.Sprintf("Channel %X has multiple reactors %v & %v", chId, sw.reactorsByCh[chId], reactor))
  63. }
  64. sw.chDescs = append(sw.chDescs, chDesc)
  65. sw.reactorsByCh[chId] = reactor
  66. }
  67. sw.reactors[name] = reactor
  68. return reactor
  69. }
  70. func (sw *Switch) Reactor(name string) Reactor {
  71. return sw.reactors[name]
  72. }
  73. // Convenience function
  74. func (sw *Switch) StartReactors() {
  75. for _, reactor := range sw.reactors {
  76. reactor.Start(sw)
  77. }
  78. }
  79. // Convenience function
  80. func (sw *Switch) StopReactors() {
  81. // Stop all reactors.
  82. for _, reactor := range sw.reactors {
  83. reactor.Stop()
  84. }
  85. }
  86. // Convenience function
  87. func (sw *Switch) StopPeers() {
  88. // Stop each peer.
  89. for _, peer := range sw.peers.List() {
  90. peer.stop()
  91. }
  92. sw.peers = NewPeerSet()
  93. }
  94. // Convenience function
  95. func (sw *Switch) StopListeners() {
  96. // Stop each listener.
  97. for _, listener := range sw.listeners {
  98. listener.Stop()
  99. }
  100. sw.listeners = nil
  101. }
  102. // Convenience function
  103. func (sw *Switch) Stop() {
  104. sw.StopPeers()
  105. sw.StopReactors()
  106. sw.StopListeners()
  107. }
  108. // Not goroutine safe to modify.
  109. func (sw *Switch) Reactors() map[string]Reactor {
  110. return sw.reactors
  111. }
  112. func (sw *Switch) AddListener(l Listener) {
  113. sw.listeners = append(sw.listeners, l)
  114. go sw.listenerRoutine(l)
  115. }
  116. func (sw *Switch) IsListening() bool {
  117. return len(sw.listeners) > 0
  118. }
  119. func (sw *Switch) AddPeerWithConnection(conn net.Conn, outbound bool) (*Peer, error) {
  120. peer := newPeer(conn, outbound, sw.reactorsByCh, sw.chDescs, sw.StopPeerForError)
  121. // Add the peer to .peers
  122. if sw.peers.Add(peer) {
  123. log.Info("Added peer", "peer", peer)
  124. } else {
  125. log.Info("Ignoring duplicate peer", "peer", peer)
  126. return nil, ErrSwitchDuplicatePeer
  127. }
  128. // Start the peer
  129. peer.start()
  130. // Notify reactors
  131. sw.doAddPeer(peer)
  132. // Send handshake
  133. msg := &pexHandshakeMessage{Network: sw.network}
  134. peer.Send(PexChannel, msg)
  135. return peer, nil
  136. }
  137. func (sw *Switch) DialPeerWithAddress(addr *NetAddress) (*Peer, error) {
  138. log.Debug("Dialing address", "address", addr)
  139. sw.dialing.Set(addr.String(), addr)
  140. conn, err := addr.DialTimeout(peerDialTimeoutSeconds * time.Second)
  141. sw.dialing.Delete(addr.String())
  142. if err != nil {
  143. log.Debug("Failed dialing address", "address", addr, "error", err)
  144. return nil, err
  145. }
  146. peer, err := sw.AddPeerWithConnection(conn, true)
  147. if err != nil {
  148. log.Debug("Failed adding peer", "address", addr, "conn", conn, "error", err)
  149. return nil, err
  150. }
  151. log.Info("Dialed and added peer", "address", addr, "peer", peer)
  152. return peer, nil
  153. }
  154. func (sw *Switch) IsDialing(addr *NetAddress) bool {
  155. return sw.dialing.Has(addr.String())
  156. }
  157. // Broadcast runs a go routine for each attempted send, which will block
  158. // trying to send for defaultSendTimeoutSeconds. Returns a channel
  159. // which receives success values for each attempted send (false if times out)
  160. func (sw *Switch) Broadcast(chId byte, msg interface{}) chan bool {
  161. successChan := make(chan bool, len(sw.peers.List()))
  162. log.Debug("Broadcast", "channel", chId, "msg", msg)
  163. for _, peer := range sw.peers.List() {
  164. go func() {
  165. success := peer.Send(chId, msg)
  166. successChan <- success
  167. }()
  168. }
  169. return successChan
  170. }
  171. // Returns the count of outbound/inbound and outbound-dialing peers.
  172. func (sw *Switch) NumPeers() (outbound, inbound, dialing int) {
  173. peers := sw.peers.List()
  174. for _, peer := range peers {
  175. if peer.outbound {
  176. outbound++
  177. } else {
  178. inbound++
  179. }
  180. }
  181. dialing = sw.dialing.Size()
  182. return
  183. }
  184. func (sw *Switch) Peers() IPeerSet {
  185. return sw.peers
  186. }
  187. // Disconnect from a peer due to external error.
  188. // TODO: make record depending on reason.
  189. func (sw *Switch) StopPeerForError(peer *Peer, reason interface{}) {
  190. log.Info("Stopping peer for error", "peer", peer, "error", reason)
  191. sw.peers.Remove(peer)
  192. peer.stop()
  193. // Notify reactors
  194. sw.doRemovePeer(peer, reason)
  195. }
  196. // Disconnect from a peer gracefully.
  197. // TODO: handle graceful disconnects.
  198. func (sw *Switch) StopPeerGracefully(peer *Peer) {
  199. log.Info("Stopping peer gracefully")
  200. sw.peers.Remove(peer)
  201. peer.stop()
  202. // Notify reactors
  203. sw.doRemovePeer(peer, nil)
  204. }
  205. func (sw *Switch) doAddPeer(peer *Peer) {
  206. for _, reactor := range sw.reactors {
  207. reactor.AddPeer(peer)
  208. }
  209. }
  210. func (sw *Switch) doRemovePeer(peer *Peer, reason interface{}) {
  211. for _, reactor := range sw.reactors {
  212. reactor.RemovePeer(peer, reason)
  213. }
  214. }
  215. func (sw *Switch) listenerRoutine(l Listener) {
  216. for {
  217. inConn, ok := <-l.Connections()
  218. if !ok {
  219. break
  220. }
  221. // New inbound connection!
  222. peer, err := sw.AddPeerWithConnection(inConn, false)
  223. if err != nil {
  224. log.Info("Ignoring error from inbound connection: %v\n%v",
  225. peer, err)
  226. continue
  227. }
  228. // NOTE: We don't yet have the external address of the
  229. // remote (if they have a listener at all).
  230. // PEXReactor's pexRoutine will handle that.
  231. }
  232. // cleanup
  233. }
  234. //-----------------------------------------------------------------------------
  235. type SwitchEventNewPeer struct {
  236. Peer *Peer
  237. }
  238. type SwitchEventDonePeer struct {
  239. Peer *Peer
  240. Error interface{}
  241. }