You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

315 lines
7.9 KiB

10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
9 years ago
9 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
9 years ago
9 years ago
10 years ago
10 years ago
9 years ago
10 years ago
  1. package p2p
  2. import (
  3. "errors"
  4. "fmt"
  5. "net"
  6. "strconv"
  7. "sync/atomic"
  8. "time"
  9. . "github.com/tendermint/tendermint/common"
  10. "github.com/tendermint/tendermint/types"
  11. )
  12. type Reactor interface {
  13. Start(sw *Switch)
  14. Stop()
  15. GetChannels() []*ChannelDescriptor
  16. AddPeer(peer *Peer)
  17. RemovePeer(peer *Peer, reason interface{})
  18. Receive(chId byte, peer *Peer, msgBytes []byte)
  19. }
  20. //--------------------------------------
  21. type BaseReactor struct{}
  22. func (_ BaseReactor) Start(sw *Switch) {}
  23. func (_ BaseReactor) Stop() {}
  24. func (_ BaseReactor) GetChannels() []*ChannelDescriptor { return nil }
  25. func (_ BaseReactor) AddPeer(peer *Peer) {}
  26. func (_ BaseReactor) RemovePeer(peer *Peer, reason interface{}) {}
  27. func (_ BaseReactor) Receive(chId byte, peer *Peer, msgBytes []byte) {}
  28. //-----------------------------------------------------------------------------
  29. /*
  30. The `Switch` handles peer connections and exposes an API to receive incoming messages
  31. on `Reactors`. Each `Reactor` is responsible for handling incoming messages of one
  32. or more `Channels`. So while sending outgoing messages is typically performed on the peer,
  33. incoming messages are received on the reactor.
  34. */
  35. type Switch struct {
  36. listeners []Listener
  37. reactors map[string]Reactor
  38. chDescs []*ChannelDescriptor
  39. reactorsByCh map[byte]Reactor
  40. peers *PeerSet
  41. dialing *CMap
  42. running uint32
  43. nodeInfo *types.NodeInfo // our node info
  44. }
  45. var (
  46. ErrSwitchDuplicatePeer = errors.New("Duplicate peer")
  47. )
  48. const (
  49. peerDialTimeoutSeconds = 3
  50. )
  51. func NewSwitch() *Switch {
  52. sw := &Switch{
  53. reactors: make(map[string]Reactor),
  54. chDescs: make([]*ChannelDescriptor, 0),
  55. reactorsByCh: make(map[byte]Reactor),
  56. peers: NewPeerSet(),
  57. dialing: NewCMap(),
  58. running: 0,
  59. nodeInfo: nil,
  60. }
  61. return sw
  62. }
  63. // Not goroutine safe.
  64. func (sw *Switch) AddReactor(name string, reactor Reactor) Reactor {
  65. // Validate the reactor.
  66. // No two reactors can share the same channel.
  67. reactorChannels := reactor.GetChannels()
  68. for _, chDesc := range reactorChannels {
  69. chId := chDesc.Id
  70. if sw.reactorsByCh[chId] != nil {
  71. panic(fmt.Sprintf("Channel %X has multiple reactors %v & %v", chId, sw.reactorsByCh[chId], reactor))
  72. }
  73. sw.chDescs = append(sw.chDescs, chDesc)
  74. sw.reactorsByCh[chId] = reactor
  75. }
  76. sw.reactors[name] = reactor
  77. return reactor
  78. }
  79. // Not goroutine safe.
  80. func (sw *Switch) Reactors() map[string]Reactor {
  81. return sw.reactors
  82. }
  83. // Not goroutine safe.
  84. func (sw *Switch) Reactor(name string) Reactor {
  85. return sw.reactors[name]
  86. }
  87. // Not goroutine safe.
  88. func (sw *Switch) AddListener(l Listener) {
  89. sw.listeners = append(sw.listeners, l)
  90. }
  91. // Not goroutine safe.
  92. func (sw *Switch) Listeners() []Listener {
  93. return sw.listeners
  94. }
  95. // Not goroutine safe.
  96. func (sw *Switch) IsListening() bool {
  97. return len(sw.listeners) > 0
  98. }
  99. // Not goroutine safe.
  100. func (sw *Switch) SetNodeInfo(nodeInfo *types.NodeInfo) {
  101. sw.nodeInfo = nodeInfo
  102. }
  103. func (sw *Switch) Start() {
  104. if atomic.CompareAndSwapUint32(&sw.running, 0, 1) {
  105. // Start reactors
  106. for _, reactor := range sw.reactors {
  107. reactor.Start(sw)
  108. }
  109. // Start peers
  110. for _, peer := range sw.peers.List() {
  111. sw.startInitPeer(peer)
  112. }
  113. // Start listeners
  114. for _, listener := range sw.listeners {
  115. go sw.listenerRoutine(listener)
  116. }
  117. }
  118. }
  119. func (sw *Switch) Stop() {
  120. if atomic.CompareAndSwapUint32(&sw.running, 1, 0) {
  121. // Stop listeners
  122. for _, listener := range sw.listeners {
  123. listener.Stop()
  124. }
  125. sw.listeners = nil
  126. // Stop peers
  127. for _, peer := range sw.peers.List() {
  128. peer.stop()
  129. }
  130. sw.peers = NewPeerSet()
  131. // Stop reactors
  132. for _, reactor := range sw.reactors {
  133. reactor.Stop()
  134. }
  135. }
  136. }
  137. // NOTE: This performs a blocking handshake before the peer is added.
  138. func (sw *Switch) AddPeerWithConnection(conn net.Conn, outbound bool) (*Peer, error) {
  139. // First, perform handshake
  140. peerNodeInfo, err := peerHandshake(conn, sw.nodeInfo)
  141. if err != nil {
  142. return nil, err
  143. }
  144. if err := sw.nodeInfo.CompatibleWith(peerNodeInfo); err != nil {
  145. return nil, err
  146. }
  147. // the peerNodeInfo is not verified,
  148. // so we overwrite the IP with that from the conn
  149. // and if we dialed out, the port too
  150. // everything else we just have to trust
  151. ip, port, _ := net.SplitHostPort(conn.RemoteAddr().String())
  152. peerNodeInfo.Host = ip
  153. if outbound {
  154. porti, _ := strconv.Atoi(port)
  155. peerNodeInfo.P2PPort = uint16(porti)
  156. }
  157. peer := newPeer(conn, peerNodeInfo, outbound, sw.reactorsByCh, sw.chDescs, sw.StopPeerForError)
  158. // Add the peer to .peers
  159. if sw.peers.Add(peer) {
  160. log.Info("Added peer", "peer", peer)
  161. } else {
  162. log.Info("Ignoring duplicate peer", "peer", peer)
  163. return nil, ErrSwitchDuplicatePeer
  164. }
  165. if atomic.LoadUint32(&sw.running) == 1 {
  166. sw.startInitPeer(peer)
  167. }
  168. return peer, nil
  169. }
  170. func (sw *Switch) startInitPeer(peer *Peer) {
  171. peer.start()
  172. sw.addPeerToReactors(peer)
  173. }
  174. func (sw *Switch) DialPeerWithAddress(addr *NetAddress) (*Peer, error) {
  175. log.Debug("Dialing address", "address", addr)
  176. sw.dialing.Set(addr.IP.String(), addr)
  177. conn, err := addr.DialTimeout(peerDialTimeoutSeconds * time.Second)
  178. sw.dialing.Delete(addr.IP.String())
  179. if err != nil {
  180. log.Debug("Failed dialing address", "address", addr, "error", err)
  181. return nil, err
  182. }
  183. peer, err := sw.AddPeerWithConnection(conn, true)
  184. if err != nil {
  185. log.Debug("Failed adding peer", "address", addr, "conn", conn, "error", err)
  186. return nil, err
  187. }
  188. log.Info("Dialed and added peer", "address", addr, "peer", peer)
  189. return peer, nil
  190. }
  191. func (sw *Switch) IsDialing(addr *NetAddress) bool {
  192. return sw.dialing.Has(addr.IP.String())
  193. }
  194. // Broadcast runs a go routine for each attempted send, which will block
  195. // trying to send for defaultSendTimeoutSeconds. Returns a channel
  196. // which receives success values for each attempted send (false if times out)
  197. func (sw *Switch) Broadcast(chId byte, msg interface{}) chan bool {
  198. successChan := make(chan bool, len(sw.peers.List()))
  199. log.Debug("Broadcast", "channel", chId, "msg", msg)
  200. for _, peer := range sw.peers.List() {
  201. go func(peer *Peer) {
  202. success := peer.Send(chId, msg)
  203. successChan <- success
  204. }(peer)
  205. }
  206. return successChan
  207. }
  208. // Returns the count of outbound/inbound and outbound-dialing peers.
  209. func (sw *Switch) NumPeers() (outbound, inbound, dialing int) {
  210. peers := sw.peers.List()
  211. for _, peer := range peers {
  212. if peer.outbound {
  213. outbound++
  214. } else {
  215. inbound++
  216. }
  217. }
  218. dialing = sw.dialing.Size()
  219. return
  220. }
  221. func (sw *Switch) Peers() IPeerSet {
  222. return sw.peers
  223. }
  224. // Disconnect from a peer due to external error.
  225. // TODO: make record depending on reason.
  226. func (sw *Switch) StopPeerForError(peer *Peer, reason interface{}) {
  227. log.Info("Stopping peer for error", "peer", peer, "error", reason)
  228. sw.peers.Remove(peer)
  229. peer.stop()
  230. sw.removePeerFromReactors(peer, reason)
  231. }
  232. // Disconnect from a peer gracefully.
  233. // TODO: handle graceful disconnects.
  234. func (sw *Switch) StopPeerGracefully(peer *Peer) {
  235. log.Info("Stopping peer gracefully")
  236. sw.peers.Remove(peer)
  237. peer.stop()
  238. sw.removePeerFromReactors(peer, nil)
  239. }
  240. func (sw *Switch) addPeerToReactors(peer *Peer) {
  241. for _, reactor := range sw.reactors {
  242. reactor.AddPeer(peer)
  243. }
  244. }
  245. func (sw *Switch) removePeerFromReactors(peer *Peer, reason interface{}) {
  246. for _, reactor := range sw.reactors {
  247. reactor.RemovePeer(peer, reason)
  248. }
  249. }
  250. func (sw *Switch) listenerRoutine(l Listener) {
  251. for {
  252. inConn, ok := <-l.Connections()
  253. if !ok {
  254. break
  255. }
  256. // New inbound connection!
  257. peer, err := sw.AddPeerWithConnection(inConn, false)
  258. if err != nil {
  259. log.Info(Fmt("Ignoring error from inbound connection: %v\n%v", peer, err))
  260. continue
  261. }
  262. // NOTE: We don't yet have the external address of the
  263. // remote (if they have a listener at all).
  264. // PEXReactor's pexRoutine will handle that.
  265. }
  266. // cleanup
  267. }
  268. //-----------------------------------------------------------------------------
  269. type SwitchEventNewPeer struct {
  270. Peer *Peer
  271. }
  272. type SwitchEventDonePeer struct {
  273. Peer *Peer
  274. Error interface{}
  275. }