You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

255 lines
6.0 KiB

10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
  1. package p2p
  2. import (
  3. "encoding/hex"
  4. "errors"
  5. "fmt"
  6. "net"
  7. "sync/atomic"
  8. "time"
  9. . "github.com/tendermint/tendermint/common"
  10. )
  11. type Reactor interface {
  12. Start(sw *Switch)
  13. Stop()
  14. GetChannels() []*ChannelDescriptor
  15. AddPeer(peer *Peer)
  16. RemovePeer(peer *Peer, reason interface{})
  17. Receive(chId byte, peer *Peer, msgBytes []byte)
  18. }
  19. //-----------------------------------------------------------------------------
  20. /*
  21. The `Switch` handles peer connections and exposes an API to receive incoming messages
  22. on `Reactors`. Each `Reactor` is responsible for handling incoming messages of one
  23. or more `Channels`. So while sending outgoing messages is typically performed on the peer,
  24. incoming messages are received on the reactor.
  25. */
  26. type Switch struct {
  27. reactors []Reactor
  28. chDescs []*ChannelDescriptor
  29. reactorsByCh map[byte]Reactor
  30. peers *PeerSet
  31. dialing *CMap
  32. listeners *CMap // listenerName -> chan interface{}
  33. quit chan struct{}
  34. started uint32
  35. stopped uint32
  36. chainId string
  37. }
  38. var (
  39. ErrSwitchStopped = errors.New("Switch already stopped")
  40. ErrSwitchDuplicatePeer = errors.New("Duplicate peer")
  41. )
  42. const (
  43. peerDialTimeoutSeconds = 3
  44. )
  45. func NewSwitch(reactors []Reactor) *Switch {
  46. // Validate the reactors. no two reactors can share the same channel.
  47. chDescs := []*ChannelDescriptor{}
  48. reactorsByCh := make(map[byte]Reactor)
  49. for _, reactor := range reactors {
  50. reactorChannels := reactor.GetChannels()
  51. for _, chDesc := range reactorChannels {
  52. chId := chDesc.Id
  53. if reactorsByCh[chId] != nil {
  54. panic(fmt.Sprintf("Channel %X has multiple reactors %v & %v", chId, reactorsByCh[chId], reactor))
  55. }
  56. chDescs = append(chDescs, chDesc)
  57. reactorsByCh[chId] = reactor
  58. }
  59. }
  60. sw := &Switch{
  61. reactors: reactors,
  62. chDescs: chDescs,
  63. reactorsByCh: reactorsByCh,
  64. peers: NewPeerSet(),
  65. dialing: NewCMap(),
  66. listeners: NewCMap(),
  67. quit: make(chan struct{}),
  68. stopped: 0,
  69. }
  70. return sw
  71. }
  72. func (sw *Switch) Start() {
  73. if atomic.CompareAndSwapUint32(&sw.started, 0, 1) {
  74. log.Info("Starting Switch")
  75. for _, reactor := range sw.reactors {
  76. reactor.Start(sw)
  77. }
  78. }
  79. }
  80. func (sw *Switch) Stop() {
  81. if atomic.CompareAndSwapUint32(&sw.stopped, 0, 1) {
  82. log.Info("Stopping Switch")
  83. close(sw.quit)
  84. // Stop each peer.
  85. for _, peer := range sw.peers.List() {
  86. peer.stop()
  87. }
  88. sw.peers = NewPeerSet()
  89. // Stop all reactors.
  90. for _, reactor := range sw.reactors {
  91. reactor.Stop()
  92. }
  93. }
  94. }
  95. func (sw *Switch) Reactors() []Reactor {
  96. return sw.reactors
  97. }
  98. func (sw *Switch) AddPeerWithConnection(conn net.Conn, outbound bool) (*Peer, error) {
  99. if atomic.LoadUint32(&sw.stopped) == 1 {
  100. return nil, ErrSwitchStopped
  101. }
  102. peer := newPeer(conn, outbound, sw.reactorsByCh, sw.chDescs, sw.StopPeerForError)
  103. // Add the peer to .peers
  104. if sw.peers.Add(peer) {
  105. log.Info("Added peer", "peer", peer)
  106. } else {
  107. log.Info("Ignoring duplicate peer", "peer", peer)
  108. return nil, ErrSwitchDuplicatePeer
  109. }
  110. // Start the peer
  111. go peer.start()
  112. // Notify listeners.
  113. sw.doAddPeer(peer)
  114. // Send handshake
  115. msg := &pexHandshakeMessage{ChainId: sw.chainId}
  116. peer.Send(PexCh, msg)
  117. return peer, nil
  118. }
  119. func (sw *Switch) DialPeerWithAddress(addr *NetAddress) (*Peer, error) {
  120. if atomic.LoadUint32(&sw.stopped) == 1 {
  121. return nil, ErrSwitchStopped
  122. }
  123. log.Debug("Dialing address", "address", addr)
  124. sw.dialing.Set(addr.String(), addr)
  125. conn, err := addr.DialTimeout(peerDialTimeoutSeconds * time.Second)
  126. sw.dialing.Delete(addr.String())
  127. if err != nil {
  128. log.Debug("Failed dialing address", "address", addr, "error", err)
  129. return nil, err
  130. }
  131. peer, err := sw.AddPeerWithConnection(conn, true)
  132. if err != nil {
  133. log.Debug("Failed adding peer", "address", addr, "conn", conn, "error", err)
  134. return nil, err
  135. }
  136. log.Info("Dialed and added peer", "address", addr, "peer", peer)
  137. return peer, nil
  138. }
  139. func (sw *Switch) IsDialing(addr *NetAddress) bool {
  140. return sw.dialing.Has(addr.String())
  141. }
  142. // Broadcast runs a go routine for each attemptted send, which will block
  143. // trying to send for defaultSendTimeoutSeconds. Returns a channel
  144. // which receives success values for each attempted send (false if times out)
  145. func (sw *Switch) Broadcast(chId byte, msg interface{}) chan bool {
  146. if atomic.LoadUint32(&sw.stopped) == 1 {
  147. return nil
  148. }
  149. successChan := make(chan bool, len(sw.peers.List()))
  150. log.Debug("Broadcast", "channel", chId, "msg", msg)
  151. for _, peer := range sw.peers.List() {
  152. go func() {
  153. success := peer.Send(chId, msg)
  154. successChan <- success
  155. }()
  156. }
  157. return successChan
  158. }
  159. // Returns the count of outbound/inbound and outbound-dialing peers.
  160. func (sw *Switch) NumPeers() (outbound, inbound, dialing int) {
  161. peers := sw.peers.List()
  162. for _, peer := range peers {
  163. if peer.outbound {
  164. outbound++
  165. } else {
  166. inbound++
  167. }
  168. }
  169. dialing = sw.dialing.Size()
  170. return
  171. }
  172. func (sw *Switch) Peers() IPeerSet {
  173. return sw.peers
  174. }
  175. // Disconnect from a peer due to external error.
  176. // TODO: make record depending on reason.
  177. func (sw *Switch) StopPeerForError(peer *Peer, reason interface{}) {
  178. log.Info("Stopping peer for error", "peer", peer, "error", reason)
  179. sw.peers.Remove(peer)
  180. peer.stop()
  181. // Notify listeners
  182. sw.doRemovePeer(peer, reason)
  183. }
  184. // Disconnect from a peer gracefully.
  185. // TODO: handle graceful disconnects.
  186. func (sw *Switch) StopPeerGracefully(peer *Peer) {
  187. log.Info("Stopping peer gracefully")
  188. sw.peers.Remove(peer)
  189. peer.stop()
  190. // Notify listeners
  191. sw.doRemovePeer(peer, nil)
  192. }
  193. func (sw *Switch) SetChainId(hash []byte, network string) {
  194. sw.chainId = hex.EncodeToString(hash) + "-" + network
  195. }
  196. func (sw *Switch) IsListening() bool {
  197. return sw.listeners.Size() > 0
  198. }
  199. func (sw *Switch) doAddPeer(peer *Peer) {
  200. for _, reactor := range sw.reactors {
  201. reactor.AddPeer(peer)
  202. }
  203. }
  204. func (sw *Switch) doRemovePeer(peer *Peer, reason interface{}) {
  205. for _, reactor := range sw.reactors {
  206. reactor.RemovePeer(peer, reason)
  207. }
  208. }
  209. //-----------------------------------------------------------------------------
  210. type SwitchEventNewPeer struct {
  211. Peer *Peer
  212. }
  213. type SwitchEventDonePeer struct {
  214. Peer *Peer
  215. Error interface{}
  216. }