You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

391 lines
10 KiB

10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
  1. package p2p
  2. import (
  3. "errors"
  4. "fmt"
  5. "net"
  6. "strconv"
  7. "time"
  8. "github.com/tendermint/tendermint/Godeps/_workspace/src/github.com/tendermint/log15"
  9. acm "github.com/tendermint/tendermint/account"
  10. . "github.com/tendermint/tendermint/common"
  11. "github.com/tendermint/tendermint/types"
  12. )
  13. type Reactor interface {
  14. Service // Start, Stop
  15. SetSwitch(*Switch)
  16. GetChannels() []*ChannelDescriptor
  17. AddPeer(peer *Peer)
  18. RemovePeer(peer *Peer, reason interface{})
  19. Receive(chID byte, peer *Peer, msgBytes []byte)
  20. }
  21. //--------------------------------------
  22. type BaseReactor struct {
  23. QuitService // Provides Start, Stop, .Quit
  24. Switch *Switch
  25. }
  26. func NewBaseReactor(log log15.Logger, name string, impl Reactor) *BaseReactor {
  27. return &BaseReactor{
  28. QuitService: *NewQuitService(log, name, impl),
  29. Switch: nil,
  30. }
  31. }
  32. func (br *BaseReactor) SetSwitch(sw *Switch) {
  33. br.Switch = sw
  34. }
  35. func (_ *BaseReactor) GetChannels() []*ChannelDescriptor { return nil }
  36. func (_ *BaseReactor) AddPeer(peer *Peer) {}
  37. func (_ *BaseReactor) RemovePeer(peer *Peer, reason interface{}) {}
  38. func (_ *BaseReactor) Receive(chID byte, peer *Peer, msgBytes []byte) {}
  39. //-----------------------------------------------------------------------------
  40. /*
  41. The `Switch` handles peer connections and exposes an API to receive incoming messages
  42. on `Reactors`. Each `Reactor` is responsible for handling incoming messages of one
  43. or more `Channels`. So while sending outgoing messages is typically performed on the peer,
  44. incoming messages are received on the reactor.
  45. */
  46. type Switch struct {
  47. BaseService
  48. listeners []Listener
  49. reactors map[string]Reactor
  50. chDescs []*ChannelDescriptor
  51. reactorsByCh map[byte]Reactor
  52. peers *PeerSet
  53. dialing *CMap
  54. nodeInfo *types.NodeInfo // our node info
  55. nodePrivKey acm.PrivKeyEd25519 // our node privkey
  56. }
  57. var (
  58. ErrSwitchDuplicatePeer = errors.New("Duplicate peer")
  59. ErrSwitchMaxPeersPerIPRange = errors.New("IP range has too many peers")
  60. )
  61. const (
  62. peerDialTimeoutSeconds = 3 // TODO make this configurable
  63. handshakeTimeoutSeconds = 20 // TODO make this configurable
  64. maxNumPeers = 50 // TODO make this configurable
  65. )
  66. func NewSwitch() *Switch {
  67. sw := &Switch{
  68. reactors: make(map[string]Reactor),
  69. chDescs: make([]*ChannelDescriptor, 0),
  70. reactorsByCh: make(map[byte]Reactor),
  71. peers: NewPeerSet(),
  72. dialing: NewCMap(),
  73. nodeInfo: nil,
  74. }
  75. sw.BaseService = *NewBaseService(log, "P2P Switch", sw)
  76. return sw
  77. }
  78. // Not goroutine safe.
  79. func (sw *Switch) AddReactor(name string, reactor Reactor) Reactor {
  80. // Validate the reactor.
  81. // No two reactors can share the same channel.
  82. reactorChannels := reactor.GetChannels()
  83. for _, chDesc := range reactorChannels {
  84. chID := chDesc.ID
  85. if sw.reactorsByCh[chID] != nil {
  86. PanicSanity(fmt.Sprintf("Channel %X has multiple reactors %v & %v", chID, sw.reactorsByCh[chID], reactor))
  87. }
  88. sw.chDescs = append(sw.chDescs, chDesc)
  89. sw.reactorsByCh[chID] = reactor
  90. }
  91. sw.reactors[name] = reactor
  92. reactor.SetSwitch(sw)
  93. return reactor
  94. }
  95. // Not goroutine safe.
  96. func (sw *Switch) Reactors() map[string]Reactor {
  97. return sw.reactors
  98. }
  99. // Not goroutine safe.
  100. func (sw *Switch) Reactor(name string) Reactor {
  101. return sw.reactors[name]
  102. }
  103. // Not goroutine safe.
  104. func (sw *Switch) AddListener(l Listener) {
  105. sw.listeners = append(sw.listeners, l)
  106. }
  107. // Not goroutine safe.
  108. func (sw *Switch) Listeners() []Listener {
  109. return sw.listeners
  110. }
  111. // Not goroutine safe.
  112. func (sw *Switch) IsListening() bool {
  113. return len(sw.listeners) > 0
  114. }
  115. // Not goroutine safe.
  116. func (sw *Switch) SetNodeInfo(nodeInfo *types.NodeInfo) {
  117. sw.nodeInfo = nodeInfo
  118. }
  119. // Not goroutine safe.
  120. func (sw *Switch) NodeInfo() *types.NodeInfo {
  121. return sw.nodeInfo
  122. }
  123. // Not goroutine safe.
  124. // NOTE: Overwrites sw.nodeInfo.PubKey
  125. func (sw *Switch) SetNodePrivKey(nodePrivKey acm.PrivKeyEd25519) {
  126. sw.nodePrivKey = nodePrivKey
  127. if sw.nodeInfo != nil {
  128. sw.nodeInfo.PubKey = nodePrivKey.PubKey().(acm.PubKeyEd25519)
  129. }
  130. }
  131. // Switch.Start() starts all the reactors, peers, and listeners.
  132. func (sw *Switch) OnStart() error {
  133. sw.BaseService.OnStart()
  134. // Start reactors
  135. for _, reactor := range sw.reactors {
  136. _, err := reactor.Start()
  137. if err != nil {
  138. return err
  139. }
  140. }
  141. // Start peers
  142. for _, peer := range sw.peers.List() {
  143. sw.startInitPeer(peer)
  144. }
  145. // Start listeners
  146. for _, listener := range sw.listeners {
  147. go sw.listenerRoutine(listener)
  148. }
  149. return nil
  150. }
  151. func (sw *Switch) OnStop() {
  152. sw.BaseService.OnStop()
  153. // Stop listeners
  154. for _, listener := range sw.listeners {
  155. listener.Stop()
  156. }
  157. sw.listeners = nil
  158. // Stop peers
  159. for _, peer := range sw.peers.List() {
  160. peer.Stop()
  161. }
  162. sw.peers = NewPeerSet()
  163. // Stop reactors
  164. for _, reactor := range sw.reactors {
  165. reactor.Stop()
  166. }
  167. }
  168. // NOTE: This performs a blocking handshake before the peer is added.
  169. // CONTRACT: Iff error is returned, peer is nil, and conn is immediately closed.
  170. func (sw *Switch) AddPeerWithConnection(conn net.Conn, outbound bool) (*Peer, error) {
  171. // Set deadline for handshake so we don't block forever on conn.ReadFull
  172. conn.SetDeadline(time.Now().Add(handshakeTimeoutSeconds * time.Second))
  173. // First, encrypt the connection.
  174. sconn, err := MakeSecretConnection(conn, sw.nodePrivKey)
  175. if err != nil {
  176. conn.Close()
  177. return nil, err
  178. }
  179. // Then, perform node handshake
  180. peerNodeInfo, err := peerHandshake(sconn, sw.nodeInfo)
  181. if err != nil {
  182. sconn.Close()
  183. return nil, err
  184. }
  185. // Check that the professed PubKey matches the sconn's.
  186. if !peerNodeInfo.PubKey.Equals(sconn.RemotePubKey()) {
  187. sconn.Close()
  188. return nil, fmt.Errorf("Ignoring connection with unmatching pubkey: %v vs %v",
  189. peerNodeInfo.PubKey, sconn.RemotePubKey())
  190. }
  191. // Avoid self
  192. if peerNodeInfo.PubKey.Equals(sw.nodeInfo.PubKey) {
  193. sconn.Close()
  194. return nil, fmt.Errorf("Ignoring connection from self")
  195. }
  196. // Check version, chain id
  197. if err := sw.nodeInfo.CompatibleWith(peerNodeInfo); err != nil {
  198. sconn.Close()
  199. return nil, err
  200. }
  201. // The peerNodeInfo is not verified, so overwrite
  202. // the IP, and the port too if we dialed out
  203. // Everything else we just have to trust
  204. ip, port, _ := net.SplitHostPort(sconn.RemoteAddr().String())
  205. peerNodeInfo.Host = ip
  206. if outbound {
  207. porti, _ := strconv.Atoi(port)
  208. peerNodeInfo.P2PPort = uint16(porti)
  209. }
  210. peer := newPeer(sconn, peerNodeInfo, outbound, sw.reactorsByCh, sw.chDescs, sw.StopPeerForError)
  211. // Add the peer to .peers
  212. // ignore if duplicate or if we already have too many for that IP range
  213. if err := sw.peers.Add(peer); err != nil {
  214. log.Notice("Ignoring peer", "error", err, "peer", peer)
  215. peer.Stop()
  216. return nil, err
  217. }
  218. // remove deadline and start peer
  219. conn.SetDeadline(time.Time{})
  220. if sw.IsRunning() {
  221. sw.startInitPeer(peer)
  222. }
  223. log.Notice("Added peer", "peer", peer)
  224. return peer, nil
  225. }
  226. func (sw *Switch) startInitPeer(peer *Peer) {
  227. peer.Start() // spawn send/recv routines
  228. sw.addPeerToReactors(peer) // run AddPeer on each reactor
  229. }
  230. func (sw *Switch) DialPeerWithAddress(addr *NetAddress) (*Peer, error) {
  231. log.Info("Dialing address", "address", addr)
  232. sw.dialing.Set(addr.IP.String(), addr)
  233. conn, err := addr.DialTimeout(peerDialTimeoutSeconds * time.Second)
  234. sw.dialing.Delete(addr.IP.String())
  235. if err != nil {
  236. log.Info("Failed dialing address", "address", addr, "error", err)
  237. return nil, err
  238. }
  239. peer, err := sw.AddPeerWithConnection(conn, true)
  240. if err != nil {
  241. log.Info("Failed adding peer", "address", addr, "conn", conn, "error", err)
  242. return nil, err
  243. }
  244. log.Notice("Dialed and added peer", "address", addr, "peer", peer)
  245. return peer, nil
  246. }
  247. func (sw *Switch) IsDialing(addr *NetAddress) bool {
  248. return sw.dialing.Has(addr.IP.String())
  249. }
  250. // Broadcast runs a go routine for each attempted send, which will block
  251. // trying to send for defaultSendTimeoutSeconds. Returns a channel
  252. // which receives success values for each attempted send (false if times out)
  253. func (sw *Switch) Broadcast(chID byte, msg interface{}) chan bool {
  254. successChan := make(chan bool, len(sw.peers.List()))
  255. log.Info("Broadcast", "channel", chID, "msg", msg)
  256. for _, peer := range sw.peers.List() {
  257. go func(peer *Peer) {
  258. success := peer.Send(chID, msg)
  259. successChan <- success
  260. }(peer)
  261. }
  262. return successChan
  263. }
  264. // Returns the count of outbound/inbound and outbound-dialing peers.
  265. func (sw *Switch) NumPeers() (outbound, inbound, dialing int) {
  266. peers := sw.peers.List()
  267. for _, peer := range peers {
  268. if peer.outbound {
  269. outbound++
  270. } else {
  271. inbound++
  272. }
  273. }
  274. dialing = sw.dialing.Size()
  275. return
  276. }
  277. func (sw *Switch) Peers() IPeerSet {
  278. return sw.peers
  279. }
  280. // Disconnect from a peer due to external error.
  281. // TODO: make record depending on reason.
  282. func (sw *Switch) StopPeerForError(peer *Peer, reason interface{}) {
  283. log.Notice("Stopping peer for error", "peer", peer, "error", reason)
  284. sw.peers.Remove(peer)
  285. peer.Stop()
  286. sw.removePeerFromReactors(peer, reason)
  287. }
  288. // Disconnect from a peer gracefully.
  289. // TODO: handle graceful disconnects.
  290. func (sw *Switch) StopPeerGracefully(peer *Peer) {
  291. log.Notice("Stopping peer gracefully")
  292. sw.peers.Remove(peer)
  293. peer.Stop()
  294. sw.removePeerFromReactors(peer, nil)
  295. }
  296. func (sw *Switch) addPeerToReactors(peer *Peer) {
  297. for _, reactor := range sw.reactors {
  298. reactor.AddPeer(peer)
  299. }
  300. }
  301. func (sw *Switch) removePeerFromReactors(peer *Peer, reason interface{}) {
  302. for _, reactor := range sw.reactors {
  303. reactor.RemovePeer(peer, reason)
  304. }
  305. }
  306. func (sw *Switch) listenerRoutine(l Listener) {
  307. for {
  308. inConn, ok := <-l.Connections()
  309. if !ok {
  310. break
  311. }
  312. // ignore connection if we already have enough
  313. if maxNumPeers <= sw.peers.Size() {
  314. log.Info("Ignoring inbound connection: already have enough peers", "address", inConn.RemoteAddr().String(), "numPeers", sw.peers.Size(), "max", maxNumPeers)
  315. continue
  316. }
  317. // Ignore connections from IP ranges for which we have too many
  318. if sw.peers.HasMaxForIPRange(inConn) {
  319. log.Info("Ignoring inbound connection: already have enough peers for that IP range", "address", inConn.RemoteAddr().String())
  320. continue
  321. }
  322. // New inbound connection!
  323. _, err := sw.AddPeerWithConnection(inConn, false)
  324. if err != nil {
  325. log.Notice("Ignoring inbound connection: error on AddPeerWithConnection", "address", inConn.RemoteAddr().String(), "error", err)
  326. continue
  327. }
  328. // NOTE: We don't yet have the listening port of the
  329. // remote (if they have a listener at all).
  330. // The peerHandshake will handle that
  331. }
  332. // cleanup
  333. }
  334. //-----------------------------------------------------------------------------
  335. type SwitchEventNewPeer struct {
  336. Peer *Peer
  337. }
  338. type SwitchEventDonePeer struct {
  339. Peer *Peer
  340. Error interface{}
  341. }