You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

252 lines
5.8 KiB

10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
11 years ago
11 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
11 years ago
10 years ago
11 years ago
11 years ago
10 years ago
10 years ago
10 years ago
  1. package p2p
  2. import (
  3. "encoding/hex"
  4. "errors"
  5. "fmt"
  6. "net"
  7. "sync/atomic"
  8. "time"
  9. . "github.com/tendermint/tendermint/common"
  10. )
  11. type Reactor interface {
  12. Start(sw *Switch)
  13. Stop()
  14. GetChannels() []*ChannelDescriptor
  15. AddPeer(peer *Peer)
  16. RemovePeer(peer *Peer, reason interface{})
  17. Receive(chId byte, peer *Peer, msgBytes []byte)
  18. }
  19. //-----------------------------------------------------------------------------
  20. /*
  21. The `Switch` handles peer connections and exposes an API to receive incoming messages
  22. on `Reactors`. Each `Reactor` is responsible for handling incoming messages of one
  23. or more `Channels`. So while sending outgoing messages is typically performed on the peer,
  24. incoming messages are received on the reactor.
  25. */
  26. type Switch struct {
  27. reactors []Reactor
  28. chDescs []*ChannelDescriptor
  29. reactorsByCh map[byte]Reactor
  30. peers *PeerSet
  31. dialing *CMap
  32. listeners *CMap // listenerName -> chan interface{}
  33. quit chan struct{}
  34. started uint32
  35. stopped uint32
  36. chainId string
  37. }
  38. var (
  39. ErrSwitchStopped = errors.New("Switch already stopped")
  40. ErrSwitchDuplicatePeer = errors.New("Duplicate peer")
  41. )
  42. const (
  43. peerDialTimeoutSeconds = 3
  44. )
  45. func NewSwitch(reactors []Reactor) *Switch {
  46. // Validate the reactors. no two reactors can share the same channel.
  47. chDescs := []*ChannelDescriptor{}
  48. reactorsByCh := make(map[byte]Reactor)
  49. for _, reactor := range reactors {
  50. reactorChannels := reactor.GetChannels()
  51. for _, chDesc := range reactorChannels {
  52. chId := chDesc.Id
  53. if reactorsByCh[chId] != nil {
  54. panic(fmt.Sprintf("Channel %X has multiple reactors %v & %v", chId, reactorsByCh[chId], reactor))
  55. }
  56. chDescs = append(chDescs, chDesc)
  57. reactorsByCh[chId] = reactor
  58. }
  59. }
  60. sw := &Switch{
  61. reactors: reactors,
  62. chDescs: chDescs,
  63. reactorsByCh: reactorsByCh,
  64. peers: NewPeerSet(),
  65. dialing: NewCMap(),
  66. listeners: NewCMap(),
  67. quit: make(chan struct{}),
  68. stopped: 0,
  69. }
  70. return sw
  71. }
  72. func (sw *Switch) Start() {
  73. if atomic.CompareAndSwapUint32(&sw.started, 0, 1) {
  74. log.Info("Starting Switch")
  75. for _, reactor := range sw.reactors {
  76. reactor.Start(sw)
  77. }
  78. }
  79. }
  80. func (sw *Switch) Stop() {
  81. if atomic.CompareAndSwapUint32(&sw.stopped, 0, 1) {
  82. log.Info("Stopping Switch")
  83. close(sw.quit)
  84. // Stop each peer.
  85. for _, peer := range sw.peers.List() {
  86. peer.stop()
  87. }
  88. sw.peers = NewPeerSet()
  89. // Stop all reactors.
  90. for _, reactor := range sw.reactors {
  91. reactor.Stop()
  92. }
  93. }
  94. }
  95. func (sw *Switch) Reactors() []Reactor {
  96. return sw.reactors
  97. }
  98. func (sw *Switch) AddPeerWithConnection(conn net.Conn, outbound bool) (*Peer, error) {
  99. if atomic.LoadUint32(&sw.stopped) == 1 {
  100. return nil, ErrSwitchStopped
  101. }
  102. peer := newPeer(conn, outbound, sw.reactorsByCh, sw.chDescs, sw.StopPeerForError)
  103. // Add the peer to .peers
  104. if sw.peers.Add(peer) {
  105. log.Info("Added peer", "peer", peer)
  106. } else {
  107. log.Info("Ignoring duplicate peer", "peer", peer)
  108. return nil, ErrSwitchDuplicatePeer
  109. }
  110. // Start the peer
  111. go peer.start()
  112. // Notify listeners.
  113. sw.doAddPeer(peer)
  114. // Send handshake
  115. msg := &pexHandshakeMessage{ChainId: sw.chainId}
  116. peer.Send(PexCh, msg)
  117. return peer, nil
  118. }
  119. func (sw *Switch) DialPeerWithAddress(addr *NetAddress) (*Peer, error) {
  120. if atomic.LoadUint32(&sw.stopped) == 1 {
  121. return nil, ErrSwitchStopped
  122. }
  123. log.Info("Dialing peer", "address", addr)
  124. sw.dialing.Set(addr.String(), addr)
  125. conn, err := addr.DialTimeout(peerDialTimeoutSeconds * time.Second)
  126. sw.dialing.Delete(addr.String())
  127. if err != nil {
  128. return nil, err
  129. }
  130. peer, err := sw.AddPeerWithConnection(conn, true)
  131. if err != nil {
  132. return nil, err
  133. }
  134. return peer, nil
  135. }
  136. func (sw *Switch) IsDialing(addr *NetAddress) bool {
  137. return sw.dialing.Has(addr.String())
  138. }
  139. // Broadcast runs a go routine for each attemptted send, which will block
  140. // trying to send for defaultSendTimeoutSeconds. Returns a channel
  141. // which receives success values for each attempted send (false if times out)
  142. func (sw *Switch) Broadcast(chId byte, msg interface{}) chan bool {
  143. if atomic.LoadUint32(&sw.stopped) == 1 {
  144. return nil
  145. }
  146. successChan := make(chan bool, len(sw.peers.List()))
  147. log.Debug("Broadcast", "channel", chId, "msg", msg)
  148. for _, peer := range sw.peers.List() {
  149. go func() {
  150. success := peer.Send(chId, msg)
  151. successChan <- success
  152. }()
  153. }
  154. return successChan
  155. }
  156. // Returns the count of outbound/inbound and outbound-dialing peers.
  157. func (sw *Switch) NumPeers() (outbound, inbound, dialing int) {
  158. peers := sw.peers.List()
  159. for _, peer := range peers {
  160. if peer.outbound {
  161. outbound++
  162. } else {
  163. inbound++
  164. }
  165. }
  166. dialing = sw.dialing.Size()
  167. return
  168. }
  169. func (sw *Switch) Peers() IPeerSet {
  170. return sw.peers
  171. }
  172. // Disconnect from a peer due to external error.
  173. // TODO: make record depending on reason.
  174. func (sw *Switch) StopPeerForError(peer *Peer, reason interface{}) {
  175. log.Info("Stopping peer for error", "peer", peer, "error", reason)
  176. sw.peers.Remove(peer)
  177. peer.stop()
  178. // Notify listeners
  179. sw.doRemovePeer(peer, reason)
  180. }
  181. // Disconnect from a peer gracefully.
  182. // TODO: handle graceful disconnects.
  183. func (sw *Switch) StopPeerGracefully(peer *Peer) {
  184. log.Info("Stopping peer gracefully")
  185. sw.peers.Remove(peer)
  186. peer.stop()
  187. // Notify listeners
  188. sw.doRemovePeer(peer, nil)
  189. }
  190. func (sw *Switch) SetChainId(hash []byte, network string) {
  191. sw.chainId = hex.EncodeToString(hash) + "-" + network
  192. }
  193. func (sw *Switch) IsListening() bool {
  194. return sw.listeners.Size() > 0
  195. }
  196. func (sw *Switch) doAddPeer(peer *Peer) {
  197. for _, reactor := range sw.reactors {
  198. reactor.AddPeer(peer)
  199. }
  200. }
  201. func (sw *Switch) doRemovePeer(peer *Peer, reason interface{}) {
  202. for _, reactor := range sw.reactors {
  203. reactor.RemovePeer(peer, reason)
  204. }
  205. }
  206. //-----------------------------------------------------------------------------
  207. type SwitchEventNewPeer struct {
  208. Peer *Peer
  209. }
  210. type SwitchEventDonePeer struct {
  211. Peer *Peer
  212. Error interface{}
  213. }