You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

554 lines
14 KiB

9 years ago
9 years ago
9 years ago
9 years ago
9 years ago
9 years ago
9 years ago
9 years ago
9 years ago
9 years ago
9 years ago
9 years ago
9 years ago
9 years ago
9 years ago
9 years ago
9 years ago
9 years ago
9 years ago
9 years ago
9 years ago
9 years ago
9 years ago
9 years ago
  1. package p2p
  2. import (
  3. "errors"
  4. "fmt"
  5. "math/rand"
  6. "net"
  7. "time"
  8. . "github.com/tendermint/go-common"
  9. cfg "github.com/tendermint/go-config"
  10. "github.com/tendermint/go-crypto"
  11. "github.com/tendermint/log15"
  12. )
  13. type Reactor interface {
  14. Service // Start, Stop
  15. SetSwitch(*Switch)
  16. GetChannels() []*ChannelDescriptor
  17. AddPeer(peer *Peer)
  18. RemovePeer(peer *Peer, reason interface{})
  19. Receive(chID byte, peer *Peer, msgBytes []byte)
  20. }
  21. //--------------------------------------
  22. type BaseReactor struct {
  23. BaseService // Provides Start, Stop, .Quit
  24. Switch *Switch
  25. }
  26. func NewBaseReactor(log log15.Logger, name string, impl Reactor) *BaseReactor {
  27. return &BaseReactor{
  28. BaseService: *NewBaseService(log, name, impl),
  29. Switch: nil,
  30. }
  31. }
  32. func (br *BaseReactor) SetSwitch(sw *Switch) {
  33. br.Switch = sw
  34. }
  35. func (_ *BaseReactor) GetChannels() []*ChannelDescriptor { return nil }
  36. func (_ *BaseReactor) AddPeer(peer *Peer) {}
  37. func (_ *BaseReactor) RemovePeer(peer *Peer, reason interface{}) {}
  38. func (_ *BaseReactor) Receive(chID byte, peer *Peer, msgBytes []byte) {}
  39. //-----------------------------------------------------------------------------
  40. /*
  41. The `Switch` handles peer connections and exposes an API to receive incoming messages
  42. on `Reactors`. Each `Reactor` is responsible for handling incoming messages of one
  43. or more `Channels`. So while sending outgoing messages is typically performed on the peer,
  44. incoming messages are received on the reactor.
  45. */
  46. type Switch struct {
  47. BaseService
  48. config cfg.Config
  49. listeners []Listener
  50. reactors map[string]Reactor
  51. chDescs []*ChannelDescriptor
  52. reactorsByCh map[byte]Reactor
  53. peers *PeerSet
  54. dialing *CMap
  55. nodeInfo *NodeInfo // our node info
  56. nodePrivKey crypto.PrivKeyEd25519 // our node privkey
  57. filterConnByAddr func(net.Addr) error
  58. filterConnByPubKey func(crypto.PubKeyEd25519) error
  59. }
  60. var (
  61. ErrSwitchDuplicatePeer = errors.New("Duplicate peer")
  62. ErrSwitchMaxPeersPerIPRange = errors.New("IP range has too many peers")
  63. )
  64. func NewSwitch(config cfg.Config) *Switch {
  65. setConfigDefaults(config)
  66. sw := &Switch{
  67. config: config,
  68. reactors: make(map[string]Reactor),
  69. chDescs: make([]*ChannelDescriptor, 0),
  70. reactorsByCh: make(map[byte]Reactor),
  71. peers: NewPeerSet(),
  72. dialing: NewCMap(),
  73. nodeInfo: nil,
  74. }
  75. sw.BaseService = *NewBaseService(log, "P2P Switch", sw)
  76. return sw
  77. }
  78. // Not goroutine safe.
  79. func (sw *Switch) AddReactor(name string, reactor Reactor) Reactor {
  80. // Validate the reactor.
  81. // No two reactors can share the same channel.
  82. reactorChannels := reactor.GetChannels()
  83. for _, chDesc := range reactorChannels {
  84. chID := chDesc.ID
  85. if sw.reactorsByCh[chID] != nil {
  86. PanicSanity(fmt.Sprintf("Channel %X has multiple reactors %v & %v", chID, sw.reactorsByCh[chID], reactor))
  87. }
  88. sw.chDescs = append(sw.chDescs, chDesc)
  89. sw.reactorsByCh[chID] = reactor
  90. }
  91. sw.reactors[name] = reactor
  92. reactor.SetSwitch(sw)
  93. return reactor
  94. }
  95. // Not goroutine safe.
  96. func (sw *Switch) Reactors() map[string]Reactor {
  97. return sw.reactors
  98. }
  99. // Not goroutine safe.
  100. func (sw *Switch) Reactor(name string) Reactor {
  101. return sw.reactors[name]
  102. }
  103. // Not goroutine safe.
  104. func (sw *Switch) AddListener(l Listener) {
  105. sw.listeners = append(sw.listeners, l)
  106. }
  107. // Not goroutine safe.
  108. func (sw *Switch) Listeners() []Listener {
  109. return sw.listeners
  110. }
  111. // Not goroutine safe.
  112. func (sw *Switch) IsListening() bool {
  113. return len(sw.listeners) > 0
  114. }
  115. // Not goroutine safe.
  116. func (sw *Switch) SetNodeInfo(nodeInfo *NodeInfo) {
  117. sw.nodeInfo = nodeInfo
  118. }
  119. // Not goroutine safe.
  120. func (sw *Switch) NodeInfo() *NodeInfo {
  121. return sw.nodeInfo
  122. }
  123. // Not goroutine safe.
  124. // NOTE: Overwrites sw.nodeInfo.PubKey
  125. func (sw *Switch) SetNodePrivKey(nodePrivKey crypto.PrivKeyEd25519) {
  126. sw.nodePrivKey = nodePrivKey
  127. if sw.nodeInfo != nil {
  128. sw.nodeInfo.PubKey = nodePrivKey.PubKey().(crypto.PubKeyEd25519)
  129. }
  130. }
  131. // Switch.Start() starts all the reactors, peers, and listeners.
  132. func (sw *Switch) OnStart() error {
  133. sw.BaseService.OnStart()
  134. // Start reactors
  135. for _, reactor := range sw.reactors {
  136. _, err := reactor.Start()
  137. if err != nil {
  138. return err
  139. }
  140. }
  141. // Start peers
  142. for _, peer := range sw.peers.List() {
  143. sw.startInitPeer(peer)
  144. }
  145. // Start listeners
  146. for _, listener := range sw.listeners {
  147. go sw.listenerRoutine(listener)
  148. }
  149. return nil
  150. }
  151. func (sw *Switch) OnStop() {
  152. sw.BaseService.OnStop()
  153. // Stop listeners
  154. for _, listener := range sw.listeners {
  155. listener.Stop()
  156. }
  157. sw.listeners = nil
  158. // Stop peers
  159. for _, peer := range sw.peers.List() {
  160. peer.Stop()
  161. sw.peers.Remove(peer)
  162. }
  163. // Stop reactors
  164. for _, reactor := range sw.reactors {
  165. reactor.Stop()
  166. }
  167. }
  168. // NOTE: This performs a blocking handshake before the peer is added.
  169. // CONTRACT: Iff error is returned, peer is nil, and conn is immediately closed.
  170. func (sw *Switch) AddPeerWithConnection(conn net.Conn, outbound bool) (*Peer, error) {
  171. // Filter by addr (ie. ip:port)
  172. if err := sw.FilterConnByAddr(conn.RemoteAddr()); err != nil {
  173. conn.Close()
  174. return nil, err
  175. }
  176. // Set deadline for handshake so we don't block forever on conn.ReadFull
  177. conn.SetDeadline(time.Now().Add(
  178. time.Duration(sw.config.GetInt(configKeyHandshakeTimeoutSeconds)) * time.Second))
  179. // First, encrypt the connection.
  180. var sconn net.Conn = conn
  181. if sw.config.GetBool(configKeyAuthEnc) {
  182. var err error
  183. sconn, err = MakeSecretConnection(conn, sw.nodePrivKey)
  184. if err != nil {
  185. conn.Close()
  186. return nil, err
  187. }
  188. }
  189. // Filter by p2p-key
  190. if err := sw.FilterConnByPubKey(sconn.(*SecretConnection).RemotePubKey()); err != nil {
  191. sconn.Close()
  192. return nil, err
  193. }
  194. // Then, perform node handshake
  195. peerNodeInfo, err := peerHandshake(sconn, sw.nodeInfo)
  196. if err != nil {
  197. sconn.Close()
  198. return nil, err
  199. }
  200. if sw.config.GetBool(configKeyAuthEnc) {
  201. // Check that the professed PubKey matches the sconn's.
  202. if !peerNodeInfo.PubKey.Equals(sconn.(*SecretConnection).RemotePubKey()) {
  203. sconn.Close()
  204. return nil, fmt.Errorf("Ignoring connection with unmatching pubkey: %v vs %v",
  205. peerNodeInfo.PubKey, sconn.(*SecretConnection).RemotePubKey())
  206. }
  207. }
  208. // Avoid self
  209. if peerNodeInfo.PubKey.Equals(sw.nodeInfo.PubKey) {
  210. sconn.Close()
  211. return nil, fmt.Errorf("Ignoring connection from self")
  212. }
  213. // Check version, chain id
  214. if err := sw.nodeInfo.CompatibleWith(peerNodeInfo); err != nil {
  215. sconn.Close()
  216. return nil, err
  217. }
  218. peer := newPeer(sw.config, sconn, peerNodeInfo, outbound, sw.reactorsByCh, sw.chDescs, sw.StopPeerForError)
  219. // Add the peer to .peers
  220. // ignore if duplicate or if we already have too many for that IP range
  221. if err := sw.peers.Add(peer); err != nil {
  222. log.Notice("Ignoring peer", "error", err, "peer", peer)
  223. peer.Stop()
  224. return nil, err
  225. }
  226. // remove deadline and start peer
  227. conn.SetDeadline(time.Time{})
  228. if sw.IsRunning() {
  229. sw.startInitPeer(peer)
  230. }
  231. log.Notice("Added peer", "peer", peer)
  232. return peer, nil
  233. }
  234. func (sw *Switch) FilterConnByAddr(addr net.Addr) error {
  235. if sw.filterConnByAddr != nil {
  236. return sw.filterConnByAddr(addr)
  237. }
  238. return nil
  239. }
  240. func (sw *Switch) FilterConnByPubKey(pubkey crypto.PubKeyEd25519) error {
  241. if sw.filterConnByPubKey != nil {
  242. return sw.filterConnByPubKey(pubkey)
  243. }
  244. return nil
  245. }
  246. func (sw *Switch) SetAddrFilter(f func(net.Addr) error) {
  247. sw.filterConnByAddr = f
  248. }
  249. func (sw *Switch) SetPubKeyFilter(f func(crypto.PubKeyEd25519) error) {
  250. sw.filterConnByPubKey = f
  251. }
  252. func (sw *Switch) startInitPeer(peer *Peer) {
  253. peer.Start() // spawn send/recv routines
  254. sw.addPeerToReactors(peer) // run AddPeer on each reactor
  255. }
  256. // Dial a list of seeds asynchronously in random order
  257. func (sw *Switch) DialSeeds(addrBook *AddrBook, seeds []string) error {
  258. netAddrs, err := NewNetAddressStrings(seeds)
  259. if err != nil {
  260. return err
  261. }
  262. if addrBook != nil {
  263. // add seeds to `addrBook`
  264. ourAddrS := sw.nodeInfo.ListenAddr
  265. ourAddr, _ := NewNetAddressString(ourAddrS)
  266. for _, netAddr := range netAddrs {
  267. // do not add ourselves
  268. if netAddr.Equals(ourAddr) {
  269. continue
  270. }
  271. addrBook.AddAddress(netAddr, ourAddr)
  272. }
  273. addrBook.Save()
  274. }
  275. // permute the list, dial them in random order.
  276. perm := rand.Perm(len(netAddrs))
  277. for i := 0; i < len(perm); i++ {
  278. go func(i int) {
  279. time.Sleep(time.Duration(rand.Int63n(3000)) * time.Millisecond)
  280. j := perm[i]
  281. sw.dialSeed(netAddrs[j])
  282. }(i)
  283. }
  284. return nil
  285. }
  286. func (sw *Switch) dialSeed(addr *NetAddress) {
  287. peer, err := sw.DialPeerWithAddress(addr)
  288. if err != nil {
  289. log.Error("Error dialing seed", "error", err)
  290. return
  291. } else {
  292. log.Notice("Connected to seed", "peer", peer)
  293. }
  294. }
  295. func (sw *Switch) DialPeerWithAddress(addr *NetAddress) (*Peer, error) {
  296. log.Info("Dialing address", "address", addr)
  297. sw.dialing.Set(addr.IP.String(), addr)
  298. conn, err := addr.DialTimeout(time.Duration(
  299. sw.config.GetInt(configKeyDialTimeoutSeconds)) * time.Second)
  300. sw.dialing.Delete(addr.IP.String())
  301. if err != nil {
  302. log.Info("Failed dialing address", "address", addr, "error", err)
  303. return nil, err
  304. }
  305. if sw.config.GetBool(configFuzzEnable) {
  306. conn = FuzzConn(sw.config, conn)
  307. }
  308. peer, err := sw.AddPeerWithConnection(conn, true)
  309. if err != nil {
  310. log.Info("Failed adding peer", "address", addr, "conn", conn, "error", err)
  311. return nil, err
  312. }
  313. log.Notice("Dialed and added peer", "address", addr, "peer", peer)
  314. return peer, nil
  315. }
  316. func (sw *Switch) IsDialing(addr *NetAddress) bool {
  317. return sw.dialing.Has(addr.IP.String())
  318. }
  319. // Broadcast runs a go routine for each attempted send, which will block
  320. // trying to send for defaultSendTimeoutSeconds. Returns a channel
  321. // which receives success values for each attempted send (false if times out)
  322. // NOTE: Broadcast uses goroutines, so order of broadcast may not be preserved.
  323. func (sw *Switch) Broadcast(chID byte, msg interface{}) chan bool {
  324. successChan := make(chan bool, len(sw.peers.List()))
  325. log.Debug("Broadcast", "channel", chID, "msg", msg)
  326. for _, peer := range sw.peers.List() {
  327. go func(peer *Peer) {
  328. success := peer.Send(chID, msg)
  329. successChan <- success
  330. }(peer)
  331. }
  332. return successChan
  333. }
  334. // Returns the count of outbound/inbound and outbound-dialing peers.
  335. func (sw *Switch) NumPeers() (outbound, inbound, dialing int) {
  336. peers := sw.peers.List()
  337. for _, peer := range peers {
  338. if peer.outbound {
  339. outbound++
  340. } else {
  341. inbound++
  342. }
  343. }
  344. dialing = sw.dialing.Size()
  345. return
  346. }
  347. func (sw *Switch) Peers() IPeerSet {
  348. return sw.peers
  349. }
  350. // Disconnect from a peer due to external error.
  351. // TODO: make record depending on reason.
  352. func (sw *Switch) StopPeerForError(peer *Peer, reason interface{}) {
  353. log.Notice("Stopping peer for error", "peer", peer, "error", reason)
  354. sw.peers.Remove(peer)
  355. peer.Stop()
  356. sw.removePeerFromReactors(peer, reason)
  357. }
  358. // Disconnect from a peer gracefully.
  359. // TODO: handle graceful disconnects.
  360. func (sw *Switch) StopPeerGracefully(peer *Peer) {
  361. log.Notice("Stopping peer gracefully")
  362. sw.peers.Remove(peer)
  363. peer.Stop()
  364. sw.removePeerFromReactors(peer, nil)
  365. }
  366. func (sw *Switch) addPeerToReactors(peer *Peer) {
  367. for _, reactor := range sw.reactors {
  368. reactor.AddPeer(peer)
  369. }
  370. }
  371. func (sw *Switch) removePeerFromReactors(peer *Peer, reason interface{}) {
  372. for _, reactor := range sw.reactors {
  373. reactor.RemovePeer(peer, reason)
  374. }
  375. }
  376. func (sw *Switch) listenerRoutine(l Listener) {
  377. for {
  378. inConn, ok := <-l.Connections()
  379. if !ok {
  380. break
  381. }
  382. // ignore connection if we already have enough
  383. maxPeers := sw.config.GetInt(configKeyMaxNumPeers)
  384. if maxPeers <= sw.peers.Size() {
  385. log.Info("Ignoring inbound connection: already have enough peers", "address", inConn.RemoteAddr().String(), "numPeers", sw.peers.Size(), "max", maxPeers)
  386. continue
  387. }
  388. if sw.config.GetBool(configFuzzEnable) {
  389. inConn = FuzzConn(sw.config, inConn)
  390. }
  391. // New inbound connection!
  392. _, err := sw.AddPeerWithConnection(inConn, false)
  393. if err != nil {
  394. log.Notice("Ignoring inbound connection: error on AddPeerWithConnection", "address", inConn.RemoteAddr().String(), "error", err)
  395. continue
  396. }
  397. // NOTE: We don't yet have the listening port of the
  398. // remote (if they have a listener at all).
  399. // The peerHandshake will handle that
  400. }
  401. // cleanup
  402. }
  403. //-----------------------------------------------------------------------------
  404. type SwitchEventNewPeer struct {
  405. Peer *Peer
  406. }
  407. type SwitchEventDonePeer struct {
  408. Peer *Peer
  409. Error interface{}
  410. }
  411. //------------------------------------------------------------------
  412. // Switches connected via arbitrary net.Conn; useful for testing
  413. // Returns n switches, connected according to the connect func.
  414. // If connect==Connect2Switches, the switches will be fully connected.
  415. // initSwitch defines how the ith switch should be initialized (ie. with what reactors).
  416. // NOTE: panics if any switch fails to start.
  417. func MakeConnectedSwitches(n int, initSwitch func(int, *Switch) *Switch, connect func([]*Switch, int, int)) []*Switch {
  418. switches := make([]*Switch, n)
  419. for i := 0; i < n; i++ {
  420. switches[i] = makeSwitch(i, "testing", "123.123.123", initSwitch)
  421. }
  422. if err := StartSwitches(switches); err != nil {
  423. panic(err)
  424. }
  425. for i := 0; i < n; i++ {
  426. for j := i; j < n; j++ {
  427. connect(switches, i, j)
  428. }
  429. }
  430. return switches
  431. }
  432. var PanicOnAddPeerErr = false
  433. // Will connect switches i and j via net.Pipe()
  434. // Blocks until a conection is established.
  435. // NOTE: caller ensures i and j are within bounds
  436. func Connect2Switches(switches []*Switch, i, j int) {
  437. switchI := switches[i]
  438. switchJ := switches[j]
  439. c1, c2 := net.Pipe()
  440. doneCh := make(chan struct{})
  441. go func() {
  442. _, err := switchI.AddPeerWithConnection(c1, false) // AddPeer is blocking, requires handshake.
  443. if PanicOnAddPeerErr && err != nil {
  444. panic(err)
  445. }
  446. doneCh <- struct{}{}
  447. }()
  448. go func() {
  449. _, err := switchJ.AddPeerWithConnection(c2, true)
  450. if PanicOnAddPeerErr && err != nil {
  451. panic(err)
  452. }
  453. doneCh <- struct{}{}
  454. }()
  455. <-doneCh
  456. <-doneCh
  457. }
  458. func StartSwitches(switches []*Switch) error {
  459. for _, s := range switches {
  460. _, err := s.Start() // start switch and reactors
  461. if err != nil {
  462. return err
  463. }
  464. }
  465. return nil
  466. }
  467. func makeSwitch(i int, network, version string, initSwitch func(int, *Switch) *Switch) *Switch {
  468. privKey := crypto.GenPrivKeyEd25519()
  469. // new switch, add reactors
  470. // TODO: let the config be passed in?
  471. s := initSwitch(i, NewSwitch(cfg.NewMapConfig(nil)))
  472. s.SetNodeInfo(&NodeInfo{
  473. PubKey: privKey.PubKey().(crypto.PubKeyEd25519),
  474. Moniker: Fmt("switch%d", i),
  475. Network: network,
  476. Version: version,
  477. })
  478. s.SetNodePrivKey(privKey)
  479. return s
  480. }