You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

310 lines
7.7 KiB

10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
11 years ago
10 years ago
11 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
11 years ago
10 years ago
11 years ago
11 years ago
10 years ago
10 years ago
10 years ago
10 years ago
  1. package p2p
  2. import (
  3. "errors"
  4. "fmt"
  5. "net"
  6. "sync/atomic"
  7. "time"
  8. . "github.com/tendermint/tendermint/common"
  9. "github.com/tendermint/tendermint/types"
  10. )
  11. type Reactor interface {
  12. Start(sw *Switch)
  13. Stop()
  14. GetChannels() []*ChannelDescriptor
  15. AddPeer(peer *Peer)
  16. RemovePeer(peer *Peer, reason interface{})
  17. Receive(chId byte, peer *Peer, msgBytes []byte)
  18. }
  19. //--------------------------------------
  20. type BaseReactor struct{}
  21. func (_ BaseReactor) Start(sw *Switch) {}
  22. func (_ BaseReactor) Stop() {}
  23. func (_ BaseReactor) GetChannels() []*ChannelDescriptor { return nil }
  24. func (_ BaseReactor) AddPeer(peer *Peer) {}
  25. func (_ BaseReactor) RemovePeer(peer *Peer, reason interface{}) {}
  26. func (_ BaseReactor) Receive(chId byte, peer *Peer, msgBytes []byte) {}
  27. //-----------------------------------------------------------------------------
  28. /*
  29. The `Switch` handles peer connections and exposes an API to receive incoming messages
  30. on `Reactors`. Each `Reactor` is responsible for handling incoming messages of one
  31. or more `Channels`. So while sending outgoing messages is typically performed on the peer,
  32. incoming messages are received on the reactor.
  33. */
  34. type Switch struct {
  35. listeners []Listener
  36. reactors map[string]Reactor
  37. chDescs []*ChannelDescriptor
  38. reactorsByCh map[byte]Reactor
  39. peers *PeerSet
  40. dialing *CMap
  41. running uint32
  42. nodeInfo *types.NodeInfo // our node info
  43. }
  44. var (
  45. ErrSwitchDuplicatePeer = errors.New("Duplicate peer")
  46. )
  47. const (
  48. peerDialTimeoutSeconds = 3
  49. )
  50. func NewSwitch() *Switch {
  51. sw := &Switch{
  52. reactors: make(map[string]Reactor),
  53. chDescs: make([]*ChannelDescriptor, 0),
  54. reactorsByCh: make(map[byte]Reactor),
  55. peers: NewPeerSet(),
  56. dialing: NewCMap(),
  57. running: 0,
  58. nodeInfo: nil,
  59. }
  60. return sw
  61. }
  62. // Not goroutine safe.
  63. func (sw *Switch) AddReactor(name string, reactor Reactor) Reactor {
  64. // Validate the reactor.
  65. // No two reactors can share the same channel.
  66. reactorChannels := reactor.GetChannels()
  67. for _, chDesc := range reactorChannels {
  68. chId := chDesc.Id
  69. if sw.reactorsByCh[chId] != nil {
  70. panic(fmt.Sprintf("Channel %X has multiple reactors %v & %v", chId, sw.reactorsByCh[chId], reactor))
  71. }
  72. sw.chDescs = append(sw.chDescs, chDesc)
  73. sw.reactorsByCh[chId] = reactor
  74. }
  75. sw.reactors[name] = reactor
  76. return reactor
  77. }
  78. // Not goroutine safe.
  79. func (sw *Switch) Reactors() map[string]Reactor {
  80. return sw.reactors
  81. }
  82. // Not goroutine safe.
  83. func (sw *Switch) Reactor(name string) Reactor {
  84. return sw.reactors[name]
  85. }
  86. // Not goroutine safe.
  87. func (sw *Switch) AddListener(l Listener) {
  88. sw.listeners = append(sw.listeners, l)
  89. }
  90. // Not goroutine safe.
  91. func (sw *Switch) Listeners() []Listener {
  92. return sw.listeners
  93. }
  94. // Not goroutine safe.
  95. func (sw *Switch) IsListening() bool {
  96. return len(sw.listeners) > 0
  97. }
  98. // Not goroutine safe.
  99. func (sw *Switch) SetNodeInfo(nodeInfo *types.NodeInfo) {
  100. sw.nodeInfo = nodeInfo
  101. }
  102. func (sw *Switch) Start() {
  103. if atomic.CompareAndSwapUint32(&sw.running, 0, 1) {
  104. // Start reactors
  105. for _, reactor := range sw.reactors {
  106. reactor.Start(sw)
  107. }
  108. // Start peers
  109. for _, peer := range sw.peers.List() {
  110. sw.startInitPeer(peer)
  111. }
  112. // Start listeners
  113. for _, listener := range sw.listeners {
  114. go sw.listenerRoutine(listener)
  115. }
  116. }
  117. }
  118. func (sw *Switch) Stop() {
  119. if atomic.CompareAndSwapUint32(&sw.running, 1, 0) {
  120. // Stop listeners
  121. for _, listener := range sw.listeners {
  122. listener.Stop()
  123. }
  124. sw.listeners = nil
  125. // Stop peers
  126. for _, peer := range sw.peers.List() {
  127. peer.stop()
  128. }
  129. sw.peers = NewPeerSet()
  130. // Stop reactors
  131. for _, reactor := range sw.reactors {
  132. reactor.Stop()
  133. }
  134. }
  135. }
  136. // NOTE: This performs a blocking handshake before the peer is added.
  137. func (sw *Switch) AddPeerWithConnection(conn net.Conn, outbound bool) (*Peer, error) {
  138. // First, perform handshake
  139. peerNodeInfo, err := peerHandshake(conn, sw.nodeInfo)
  140. if err != nil {
  141. return nil, err
  142. }
  143. if peerNodeInfo.Network != sw.nodeInfo.Network {
  144. return nil, fmt.Errorf("Peer is on different network %v", peerNodeInfo.Network)
  145. }
  146. peer := newPeer(conn, peerNodeInfo, outbound, sw.reactorsByCh, sw.chDescs, sw.StopPeerForError)
  147. // Add the peer to .peers
  148. if sw.peers.Add(peer) {
  149. log.Info("Added peer", "peer", peer)
  150. } else {
  151. log.Info("Ignoring duplicate peer", "peer", peer)
  152. return nil, ErrSwitchDuplicatePeer
  153. }
  154. if atomic.LoadUint32(&sw.running) == 1 {
  155. sw.startInitPeer(peer)
  156. }
  157. return peer, nil
  158. }
  159. func (sw *Switch) startInitPeer(peer *Peer) {
  160. // Start the peer
  161. peer.start()
  162. // Notify reactors
  163. sw.doAddPeer(peer)
  164. }
  165. func (sw *Switch) DialPeerWithAddress(addr *NetAddress) (*Peer, error) {
  166. log.Debug("Dialing address", "address", addr)
  167. sw.dialing.Set(addr.String(), addr)
  168. conn, err := addr.DialTimeout(peerDialTimeoutSeconds * time.Second)
  169. sw.dialing.Delete(addr.String())
  170. if err != nil {
  171. log.Debug("Failed dialing address", "address", addr, "error", err)
  172. return nil, err
  173. }
  174. peer, err := sw.AddPeerWithConnection(conn, true)
  175. if err != nil {
  176. log.Debug("Failed adding peer", "address", addr, "conn", conn, "error", err)
  177. return nil, err
  178. }
  179. log.Info("Dialed and added peer", "address", addr, "peer", peer)
  180. return peer, nil
  181. }
  182. func (sw *Switch) IsDialing(addr *NetAddress) bool {
  183. return sw.dialing.Has(addr.String())
  184. }
  185. // Broadcast runs a go routine for each attempted send, which will block
  186. // trying to send for defaultSendTimeoutSeconds. Returns a channel
  187. // which receives success values for each attempted send (false if times out)
  188. func (sw *Switch) Broadcast(chId byte, msg interface{}) chan bool {
  189. successChan := make(chan bool, len(sw.peers.List()))
  190. log.Debug("Broadcast", "channel", chId, "msg", msg)
  191. for _, peer := range sw.peers.List() {
  192. go func(peer *Peer) {
  193. success := peer.Send(chId, msg)
  194. successChan <- success
  195. }(peer)
  196. }
  197. return successChan
  198. }
  199. // Returns the count of outbound/inbound and outbound-dialing peers.
  200. func (sw *Switch) NumPeers() (outbound, inbound, dialing int) {
  201. peers := sw.peers.List()
  202. for _, peer := range peers {
  203. if peer.outbound {
  204. outbound++
  205. } else {
  206. inbound++
  207. }
  208. }
  209. dialing = sw.dialing.Size()
  210. return
  211. }
  212. func (sw *Switch) Peers() IPeerSet {
  213. return sw.peers
  214. }
  215. // Disconnect from a peer due to external error.
  216. // TODO: make record depending on reason.
  217. func (sw *Switch) StopPeerForError(peer *Peer, reason interface{}) {
  218. log.Info("Stopping peer for error", "peer", peer, "error", reason)
  219. sw.peers.Remove(peer)
  220. peer.stop()
  221. // Notify reactors
  222. sw.doRemovePeer(peer, reason)
  223. }
  224. // Disconnect from a peer gracefully.
  225. // TODO: handle graceful disconnects.
  226. func (sw *Switch) StopPeerGracefully(peer *Peer) {
  227. log.Info("Stopping peer gracefully")
  228. sw.peers.Remove(peer)
  229. peer.stop()
  230. // Notify reactors
  231. sw.doRemovePeer(peer, nil)
  232. }
  233. func (sw *Switch) doAddPeer(peer *Peer) {
  234. for _, reactor := range sw.reactors {
  235. reactor.AddPeer(peer)
  236. }
  237. }
  238. func (sw *Switch) doRemovePeer(peer *Peer, reason interface{}) {
  239. for _, reactor := range sw.reactors {
  240. reactor.RemovePeer(peer, reason)
  241. }
  242. }
  243. func (sw *Switch) listenerRoutine(l Listener) {
  244. for {
  245. inConn, ok := <-l.Connections()
  246. if !ok {
  247. break
  248. }
  249. // New inbound connection!
  250. peer, err := sw.AddPeerWithConnection(inConn, false)
  251. if err != nil {
  252. log.Info(Fmt("Ignoring error from inbound connection: %v\n%v", peer, err))
  253. continue
  254. }
  255. // NOTE: We don't yet have the external address of the
  256. // remote (if they have a listener at all).
  257. // PEXReactor's pexRoutine will handle that.
  258. }
  259. // cleanup
  260. }
  261. //-----------------------------------------------------------------------------
  262. type SwitchEventNewPeer struct {
  263. Peer *Peer
  264. }
  265. type SwitchEventDonePeer struct {
  266. Peer *Peer
  267. Error interface{}
  268. }