You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

540 lines
14 KiB

9 years ago
9 years ago
9 years ago
9 years ago
9 years ago
9 years ago
9 years ago
9 years ago
9 years ago
9 years ago
9 years ago
9 years ago
9 years ago
9 years ago
9 years ago
9 years ago
9 years ago
9 years ago
9 years ago
9 years ago
9 years ago
9 years ago
9 years ago
9 years ago
  1. package p2p
  2. import (
  3. "errors"
  4. "fmt"
  5. "math/rand"
  6. "net"
  7. "time"
  8. . "github.com/tendermint/go-common"
  9. cfg "github.com/tendermint/go-config"
  10. "github.com/tendermint/go-crypto"
  11. "github.com/tendermint/log15"
  12. )
  13. type Reactor interface {
  14. Service // Start, Stop
  15. SetSwitch(*Switch)
  16. GetChannels() []*ChannelDescriptor
  17. AddPeer(peer *Peer)
  18. RemovePeer(peer *Peer, reason interface{})
  19. Receive(chID byte, peer *Peer, msgBytes []byte)
  20. }
  21. //--------------------------------------
  22. type BaseReactor struct {
  23. BaseService // Provides Start, Stop, .Quit
  24. Switch *Switch
  25. }
  26. func NewBaseReactor(log log15.Logger, name string, impl Reactor) *BaseReactor {
  27. return &BaseReactor{
  28. BaseService: *NewBaseService(log, name, impl),
  29. Switch: nil,
  30. }
  31. }
  32. func (br *BaseReactor) SetSwitch(sw *Switch) {
  33. br.Switch = sw
  34. }
  35. func (_ *BaseReactor) GetChannels() []*ChannelDescriptor { return nil }
  36. func (_ *BaseReactor) AddPeer(peer *Peer) {}
  37. func (_ *BaseReactor) RemovePeer(peer *Peer, reason interface{}) {}
  38. func (_ *BaseReactor) Receive(chID byte, peer *Peer, msgBytes []byte) {}
  39. //-----------------------------------------------------------------------------
  40. /*
  41. The `Switch` handles peer connections and exposes an API to receive incoming messages
  42. on `Reactors`. Each `Reactor` is responsible for handling incoming messages of one
  43. or more `Channels`. So while sending outgoing messages is typically performed on the peer,
  44. incoming messages are received on the reactor.
  45. */
  46. type Switch struct {
  47. BaseService
  48. config cfg.Config
  49. listeners []Listener
  50. reactors map[string]Reactor
  51. chDescs []*ChannelDescriptor
  52. reactorsByCh map[byte]Reactor
  53. peers *PeerSet
  54. dialing *CMap
  55. nodeInfo *NodeInfo // our node info
  56. nodePrivKey crypto.PrivKeyEd25519 // our node privkey
  57. filterConnByAddr func(net.Addr) error
  58. filterConnByPubKey func(crypto.PubKeyEd25519) error
  59. }
  60. var (
  61. ErrSwitchDuplicatePeer = errors.New("Duplicate peer")
  62. ErrSwitchMaxPeersPerIPRange = errors.New("IP range has too many peers")
  63. )
  64. func NewSwitch(config cfg.Config) *Switch {
  65. setConfigDefaults(config)
  66. sw := &Switch{
  67. config: config,
  68. reactors: make(map[string]Reactor),
  69. chDescs: make([]*ChannelDescriptor, 0),
  70. reactorsByCh: make(map[byte]Reactor),
  71. peers: NewPeerSet(),
  72. dialing: NewCMap(),
  73. nodeInfo: nil,
  74. }
  75. sw.BaseService = *NewBaseService(log, "P2P Switch", sw)
  76. return sw
  77. }
  78. // Not goroutine safe.
  79. func (sw *Switch) AddReactor(name string, reactor Reactor) Reactor {
  80. // Validate the reactor.
  81. // No two reactors can share the same channel.
  82. reactorChannels := reactor.GetChannels()
  83. for _, chDesc := range reactorChannels {
  84. chID := chDesc.ID
  85. if sw.reactorsByCh[chID] != nil {
  86. PanicSanity(fmt.Sprintf("Channel %X has multiple reactors %v & %v", chID, sw.reactorsByCh[chID], reactor))
  87. }
  88. sw.chDescs = append(sw.chDescs, chDesc)
  89. sw.reactorsByCh[chID] = reactor
  90. }
  91. sw.reactors[name] = reactor
  92. reactor.SetSwitch(sw)
  93. return reactor
  94. }
  95. // Not goroutine safe.
  96. func (sw *Switch) Reactors() map[string]Reactor {
  97. return sw.reactors
  98. }
  99. // Not goroutine safe.
  100. func (sw *Switch) Reactor(name string) Reactor {
  101. return sw.reactors[name]
  102. }
  103. // Not goroutine safe.
  104. func (sw *Switch) AddListener(l Listener) {
  105. sw.listeners = append(sw.listeners, l)
  106. }
  107. // Not goroutine safe.
  108. func (sw *Switch) Listeners() []Listener {
  109. return sw.listeners
  110. }
  111. // Not goroutine safe.
  112. func (sw *Switch) IsListening() bool {
  113. return len(sw.listeners) > 0
  114. }
  115. // Not goroutine safe.
  116. func (sw *Switch) SetNodeInfo(nodeInfo *NodeInfo) {
  117. sw.nodeInfo = nodeInfo
  118. }
  119. // Not goroutine safe.
  120. func (sw *Switch) NodeInfo() *NodeInfo {
  121. return sw.nodeInfo
  122. }
  123. // Not goroutine safe.
  124. // NOTE: Overwrites sw.nodeInfo.PubKey
  125. func (sw *Switch) SetNodePrivKey(nodePrivKey crypto.PrivKeyEd25519) {
  126. sw.nodePrivKey = nodePrivKey
  127. if sw.nodeInfo != nil {
  128. sw.nodeInfo.PubKey = nodePrivKey.PubKey().(crypto.PubKeyEd25519)
  129. }
  130. }
  131. // Switch.Start() starts all the reactors, peers, and listeners.
  132. func (sw *Switch) OnStart() error {
  133. sw.BaseService.OnStart()
  134. // Start reactors
  135. for _, reactor := range sw.reactors {
  136. _, err := reactor.Start()
  137. if err != nil {
  138. return err
  139. }
  140. }
  141. // Start peers
  142. for _, peer := range sw.peers.List() {
  143. sw.startInitPeer(peer)
  144. }
  145. // Start listeners
  146. for _, listener := range sw.listeners {
  147. go sw.listenerRoutine(listener)
  148. }
  149. return nil
  150. }
  151. func (sw *Switch) OnStop() {
  152. sw.BaseService.OnStop()
  153. // Stop listeners
  154. for _, listener := range sw.listeners {
  155. listener.Stop()
  156. }
  157. sw.listeners = nil
  158. // Stop peers
  159. for _, peer := range sw.peers.List() {
  160. peer.Stop()
  161. sw.peers.Remove(peer)
  162. }
  163. // Stop reactors
  164. for _, reactor := range sw.reactors {
  165. reactor.Stop()
  166. }
  167. }
  168. // NOTE: This performs a blocking handshake before the peer is added.
  169. // CONTRACT: Iff error is returned, peer is nil, and conn is immediately closed.
  170. func (sw *Switch) AddPeerWithConnection(conn net.Conn, outbound bool) (*Peer, error) {
  171. // Filter by addr (ie. ip:port)
  172. if err := sw.FilterConnByAddr(conn.RemoteAddr()); err != nil {
  173. conn.Close()
  174. return nil, err
  175. }
  176. // Set deadline for handshake so we don't block forever on conn.ReadFull
  177. conn.SetDeadline(time.Now().Add(
  178. time.Duration(sw.config.GetInt(configKeyHandshakeTimeoutSeconds)) * time.Second))
  179. // First, encrypt the connection.
  180. var sconn net.Conn = conn
  181. if sw.config.GetBool(configKeyAuthEnc) {
  182. var err error
  183. sconn, err = MakeSecretConnection(conn, sw.nodePrivKey)
  184. if err != nil {
  185. conn.Close()
  186. return nil, err
  187. }
  188. }
  189. // Filter by p2p-key
  190. if err := sw.FilterConnByPubKey(sconn.(*SecretConnection).RemotePubKey()); err != nil {
  191. sconn.Close()
  192. return nil, err
  193. }
  194. // Then, perform node handshake
  195. peerNodeInfo, err := peerHandshake(sconn, sw.nodeInfo)
  196. if err != nil {
  197. sconn.Close()
  198. return nil, err
  199. }
  200. if sw.config.GetBool(configKeyAuthEnc) {
  201. // Check that the professed PubKey matches the sconn's.
  202. if !peerNodeInfo.PubKey.Equals(sconn.(*SecretConnection).RemotePubKey()) {
  203. sconn.Close()
  204. return nil, fmt.Errorf("Ignoring connection with unmatching pubkey: %v vs %v",
  205. peerNodeInfo.PubKey, sconn.(*SecretConnection).RemotePubKey())
  206. }
  207. }
  208. // Avoid self
  209. if peerNodeInfo.PubKey.Equals(sw.nodeInfo.PubKey) {
  210. sconn.Close()
  211. return nil, fmt.Errorf("Ignoring connection from self")
  212. }
  213. // Check version, chain id
  214. if err := sw.nodeInfo.CompatibleWith(peerNodeInfo); err != nil {
  215. sconn.Close()
  216. return nil, err
  217. }
  218. peer := newPeer(sw.config, sconn, peerNodeInfo, outbound, sw.reactorsByCh, sw.chDescs, sw.StopPeerForError)
  219. // Add the peer to .peers
  220. // ignore if duplicate or if we already have too many for that IP range
  221. if err := sw.peers.Add(peer); err != nil {
  222. log.Notice("Ignoring peer", "error", err, "peer", peer)
  223. peer.Stop()
  224. return nil, err
  225. }
  226. // remove deadline and start peer
  227. conn.SetDeadline(time.Time{})
  228. if sw.IsRunning() {
  229. sw.startInitPeer(peer)
  230. }
  231. log.Notice("Added peer", "peer", peer)
  232. return peer, nil
  233. }
  234. func (sw *Switch) FilterConnByAddr(addr net.Addr) error {
  235. if sw.filterConnByAddr != nil {
  236. return sw.filterConnByAddr(addr)
  237. }
  238. return nil
  239. }
  240. func (sw *Switch) FilterConnByPubKey(pubkey crypto.PubKeyEd25519) error {
  241. if sw.filterConnByPubKey != nil {
  242. return sw.filterConnByPubKey(pubkey)
  243. }
  244. return nil
  245. }
  246. func (sw *Switch) SetAddrFilter(f func(net.Addr) error) {
  247. sw.filterConnByAddr = f
  248. }
  249. func (sw *Switch) SetPubKeyFilter(f func(crypto.PubKeyEd25519) error) {
  250. sw.filterConnByPubKey = f
  251. }
  252. func (sw *Switch) startInitPeer(peer *Peer) {
  253. peer.Start() // spawn send/recv routines
  254. sw.addPeerToReactors(peer) // run AddPeer on each reactor
  255. }
  256. // Dial a list of seeds asynchronously in random order
  257. func (sw *Switch) DialSeeds(seeds []string) error {
  258. netAddrs, err := NewNetAddressStrings(seeds)
  259. if err != nil {
  260. return err
  261. }
  262. // permute the list, dial them in random order.
  263. perm := rand.Perm(len(seeds))
  264. for i := 0; i < len(perm); i++ {
  265. go func(i int) {
  266. time.Sleep(time.Duration(rand.Int63n(3000)) * time.Millisecond)
  267. j := perm[i]
  268. sw.dialSeed(netAddrs[j])
  269. }(i)
  270. }
  271. return nil
  272. }
  273. func (sw *Switch) dialSeed(addr *NetAddress) {
  274. peer, err := sw.DialPeerWithAddress(addr)
  275. if err != nil {
  276. log.Error("Error dialing seed", "error", err)
  277. return
  278. } else {
  279. log.Notice("Connected to seed", "peer", peer)
  280. }
  281. }
  282. func (sw *Switch) DialPeerWithAddress(addr *NetAddress) (*Peer, error) {
  283. log.Info("Dialing address", "address", addr)
  284. sw.dialing.Set(addr.IP.String(), addr)
  285. conn, err := addr.DialTimeout(time.Duration(
  286. sw.config.GetInt(configKeyDialTimeoutSeconds)) * time.Second)
  287. sw.dialing.Delete(addr.IP.String())
  288. if err != nil {
  289. log.Info("Failed dialing address", "address", addr, "error", err)
  290. return nil, err
  291. }
  292. if sw.config.GetBool(configFuzzEnable) {
  293. conn = FuzzConn(sw.config, conn)
  294. }
  295. peer, err := sw.AddPeerWithConnection(conn, true)
  296. if err != nil {
  297. log.Info("Failed adding peer", "address", addr, "conn", conn, "error", err)
  298. return nil, err
  299. }
  300. log.Notice("Dialed and added peer", "address", addr, "peer", peer)
  301. return peer, nil
  302. }
  303. func (sw *Switch) IsDialing(addr *NetAddress) bool {
  304. return sw.dialing.Has(addr.IP.String())
  305. }
  306. // Broadcast runs a go routine for each attempted send, which will block
  307. // trying to send for defaultSendTimeoutSeconds. Returns a channel
  308. // which receives success values for each attempted send (false if times out)
  309. // NOTE: Broadcast uses goroutines, so order of broadcast may not be preserved.
  310. func (sw *Switch) Broadcast(chID byte, msg interface{}) chan bool {
  311. successChan := make(chan bool, len(sw.peers.List()))
  312. log.Debug("Broadcast", "channel", chID, "msg", msg)
  313. for _, peer := range sw.peers.List() {
  314. go func(peer *Peer) {
  315. success := peer.Send(chID, msg)
  316. successChan <- success
  317. }(peer)
  318. }
  319. return successChan
  320. }
  321. // Returns the count of outbound/inbound and outbound-dialing peers.
  322. func (sw *Switch) NumPeers() (outbound, inbound, dialing int) {
  323. peers := sw.peers.List()
  324. for _, peer := range peers {
  325. if peer.outbound {
  326. outbound++
  327. } else {
  328. inbound++
  329. }
  330. }
  331. dialing = sw.dialing.Size()
  332. return
  333. }
  334. func (sw *Switch) Peers() IPeerSet {
  335. return sw.peers
  336. }
  337. // Disconnect from a peer due to external error.
  338. // TODO: make record depending on reason.
  339. func (sw *Switch) StopPeerForError(peer *Peer, reason interface{}) {
  340. log.Notice("Stopping peer for error", "peer", peer, "error", reason)
  341. sw.peers.Remove(peer)
  342. peer.Stop()
  343. sw.removePeerFromReactors(peer, reason)
  344. }
  345. // Disconnect from a peer gracefully.
  346. // TODO: handle graceful disconnects.
  347. func (sw *Switch) StopPeerGracefully(peer *Peer) {
  348. log.Notice("Stopping peer gracefully")
  349. sw.peers.Remove(peer)
  350. peer.Stop()
  351. sw.removePeerFromReactors(peer, nil)
  352. }
  353. func (sw *Switch) addPeerToReactors(peer *Peer) {
  354. for _, reactor := range sw.reactors {
  355. reactor.AddPeer(peer)
  356. }
  357. }
  358. func (sw *Switch) removePeerFromReactors(peer *Peer, reason interface{}) {
  359. for _, reactor := range sw.reactors {
  360. reactor.RemovePeer(peer, reason)
  361. }
  362. }
  363. func (sw *Switch) listenerRoutine(l Listener) {
  364. for {
  365. inConn, ok := <-l.Connections()
  366. if !ok {
  367. break
  368. }
  369. // ignore connection if we already have enough
  370. maxPeers := sw.config.GetInt(configKeyMaxNumPeers)
  371. if maxPeers <= sw.peers.Size() {
  372. log.Info("Ignoring inbound connection: already have enough peers", "address", inConn.RemoteAddr().String(), "numPeers", sw.peers.Size(), "max", maxPeers)
  373. continue
  374. }
  375. if sw.config.GetBool(configFuzzEnable) {
  376. inConn = FuzzConn(sw.config, inConn)
  377. }
  378. // New inbound connection!
  379. _, err := sw.AddPeerWithConnection(inConn, false)
  380. if err != nil {
  381. log.Notice("Ignoring inbound connection: error on AddPeerWithConnection", "address", inConn.RemoteAddr().String(), "error", err)
  382. continue
  383. }
  384. // NOTE: We don't yet have the listening port of the
  385. // remote (if they have a listener at all).
  386. // The peerHandshake will handle that
  387. }
  388. // cleanup
  389. }
  390. //-----------------------------------------------------------------------------
  391. type SwitchEventNewPeer struct {
  392. Peer *Peer
  393. }
  394. type SwitchEventDonePeer struct {
  395. Peer *Peer
  396. Error interface{}
  397. }
  398. //------------------------------------------------------------------
  399. // Switches connected via arbitrary net.Conn; useful for testing
  400. // Returns n switches, connected according to the connect func.
  401. // If connect==Connect2Switches, the switches will be fully connected.
  402. // initSwitch defines how the ith switch should be initialized (ie. with what reactors).
  403. // NOTE: panics if any switch fails to start.
  404. func MakeConnectedSwitches(n int, initSwitch func(int, *Switch) *Switch, connect func([]*Switch, int, int)) []*Switch {
  405. switches := make([]*Switch, n)
  406. for i := 0; i < n; i++ {
  407. switches[i] = makeSwitch(i, "testing", "123.123.123", initSwitch)
  408. }
  409. if err := StartSwitches(switches); err != nil {
  410. panic(err)
  411. }
  412. for i := 0; i < n; i++ {
  413. for j := i; j < n; j++ {
  414. connect(switches, i, j)
  415. }
  416. }
  417. return switches
  418. }
  419. var PanicOnAddPeerErr = false
  420. // Will connect switches i and j via net.Pipe()
  421. // Blocks until a conection is established.
  422. // NOTE: caller ensures i and j are within bounds
  423. func Connect2Switches(switches []*Switch, i, j int) {
  424. switchI := switches[i]
  425. switchJ := switches[j]
  426. c1, c2 := net.Pipe()
  427. doneCh := make(chan struct{})
  428. go func() {
  429. _, err := switchI.AddPeerWithConnection(c1, false) // AddPeer is blocking, requires handshake.
  430. if PanicOnAddPeerErr && err != nil {
  431. panic(err)
  432. }
  433. doneCh <- struct{}{}
  434. }()
  435. go func() {
  436. _, err := switchJ.AddPeerWithConnection(c2, true)
  437. if PanicOnAddPeerErr && err != nil {
  438. panic(err)
  439. }
  440. doneCh <- struct{}{}
  441. }()
  442. <-doneCh
  443. <-doneCh
  444. }
  445. func StartSwitches(switches []*Switch) error {
  446. for _, s := range switches {
  447. _, err := s.Start() // start switch and reactors
  448. if err != nil {
  449. return err
  450. }
  451. }
  452. return nil
  453. }
  454. func makeSwitch(i int, network, version string, initSwitch func(int, *Switch) *Switch) *Switch {
  455. privKey := crypto.GenPrivKeyEd25519()
  456. // new switch, add reactors
  457. // TODO: let the config be passed in?
  458. s := initSwitch(i, NewSwitch(cfg.NewMapConfig(nil)))
  459. s.SetNodeInfo(&NodeInfo{
  460. PubKey: privKey.PubKey().(crypto.PubKeyEd25519),
  461. Moniker: Fmt("switch%d", i),
  462. Network: network,
  463. Version: version,
  464. })
  465. s.SetNodePrivKey(privKey)
  466. return s
  467. }