You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

574 lines
15 KiB

9 years ago
9 years ago
9 years ago
9 years ago
9 years ago
9 years ago
9 years ago
9 years ago
9 years ago
9 years ago
9 years ago
9 years ago
9 years ago
9 years ago
9 years ago
9 years ago
9 years ago
9 years ago
9 years ago
9 years ago
9 years ago
9 years ago
9 years ago
9 years ago
  1. package p2p
  2. import (
  3. "errors"
  4. "fmt"
  5. "math/rand"
  6. "net"
  7. "time"
  8. . "github.com/tendermint/go-common"
  9. cfg "github.com/tendermint/go-config"
  10. "github.com/tendermint/go-crypto"
  11. "github.com/tendermint/log15"
  12. )
  13. type Reactor interface {
  14. Service // Start, Stop
  15. SetSwitch(*Switch)
  16. GetChannels() []*ChannelDescriptor
  17. AddPeer(peer *Peer)
  18. RemovePeer(peer *Peer, reason interface{})
  19. Receive(chID byte, peer *Peer, msgBytes []byte)
  20. }
  21. //--------------------------------------
  22. type BaseReactor struct {
  23. BaseService // Provides Start, Stop, .Quit
  24. Switch *Switch
  25. }
  26. func NewBaseReactor(log log15.Logger, name string, impl Reactor) *BaseReactor {
  27. return &BaseReactor{
  28. BaseService: *NewBaseService(log, name, impl),
  29. Switch: nil,
  30. }
  31. }
  32. func (br *BaseReactor) SetSwitch(sw *Switch) {
  33. br.Switch = sw
  34. }
  35. func (_ *BaseReactor) GetChannels() []*ChannelDescriptor { return nil }
  36. func (_ *BaseReactor) AddPeer(peer *Peer) {}
  37. func (_ *BaseReactor) RemovePeer(peer *Peer, reason interface{}) {}
  38. func (_ *BaseReactor) Receive(chID byte, peer *Peer, msgBytes []byte) {}
  39. //-----------------------------------------------------------------------------
  40. /*
  41. The `Switch` handles peer connections and exposes an API to receive incoming messages
  42. on `Reactors`. Each `Reactor` is responsible for handling incoming messages of one
  43. or more `Channels`. So while sending outgoing messages is typically performed on the peer,
  44. incoming messages are received on the reactor.
  45. */
  46. type Switch struct {
  47. BaseService
  48. config cfg.Config
  49. listeners []Listener
  50. reactors map[string]Reactor
  51. chDescs []*ChannelDescriptor
  52. reactorsByCh map[byte]Reactor
  53. peers *PeerSet
  54. dialing *CMap
  55. nodeInfo *NodeInfo // our node info
  56. nodePrivKey crypto.PrivKeyEd25519 // our node privkey
  57. filterConnByAddr func(net.Addr) error
  58. filterConnByPubKey func(crypto.PubKeyEd25519) error
  59. }
  60. var (
  61. ErrSwitchDuplicatePeer = errors.New("Duplicate peer")
  62. ErrSwitchMaxPeersPerIPRange = errors.New("IP range has too many peers")
  63. )
  64. func NewSwitch(config cfg.Config) *Switch {
  65. setConfigDefaults(config)
  66. sw := &Switch{
  67. config: config,
  68. reactors: make(map[string]Reactor),
  69. chDescs: make([]*ChannelDescriptor, 0),
  70. reactorsByCh: make(map[byte]Reactor),
  71. peers: NewPeerSet(),
  72. dialing: NewCMap(),
  73. nodeInfo: nil,
  74. }
  75. sw.BaseService = *NewBaseService(log, "P2P Switch", sw)
  76. return sw
  77. }
  78. // Not goroutine safe.
  79. func (sw *Switch) AddReactor(name string, reactor Reactor) Reactor {
  80. // Validate the reactor.
  81. // No two reactors can share the same channel.
  82. reactorChannels := reactor.GetChannels()
  83. for _, chDesc := range reactorChannels {
  84. chID := chDesc.ID
  85. if sw.reactorsByCh[chID] != nil {
  86. PanicSanity(fmt.Sprintf("Channel %X has multiple reactors %v & %v", chID, sw.reactorsByCh[chID], reactor))
  87. }
  88. sw.chDescs = append(sw.chDescs, chDesc)
  89. sw.reactorsByCh[chID] = reactor
  90. }
  91. sw.reactors[name] = reactor
  92. reactor.SetSwitch(sw)
  93. return reactor
  94. }
  95. // Not goroutine safe.
  96. func (sw *Switch) Reactors() map[string]Reactor {
  97. return sw.reactors
  98. }
  99. // Not goroutine safe.
  100. func (sw *Switch) Reactor(name string) Reactor {
  101. return sw.reactors[name]
  102. }
  103. // Not goroutine safe.
  104. func (sw *Switch) AddListener(l Listener) {
  105. sw.listeners = append(sw.listeners, l)
  106. }
  107. // Not goroutine safe.
  108. func (sw *Switch) Listeners() []Listener {
  109. return sw.listeners
  110. }
  111. // Not goroutine safe.
  112. func (sw *Switch) IsListening() bool {
  113. return len(sw.listeners) > 0
  114. }
  115. // Not goroutine safe.
  116. func (sw *Switch) SetNodeInfo(nodeInfo *NodeInfo) {
  117. sw.nodeInfo = nodeInfo
  118. }
  119. // Not goroutine safe.
  120. func (sw *Switch) NodeInfo() *NodeInfo {
  121. return sw.nodeInfo
  122. }
  123. // Not goroutine safe.
  124. // NOTE: Overwrites sw.nodeInfo.PubKey
  125. func (sw *Switch) SetNodePrivKey(nodePrivKey crypto.PrivKeyEd25519) {
  126. sw.nodePrivKey = nodePrivKey
  127. if sw.nodeInfo != nil {
  128. sw.nodeInfo.PubKey = nodePrivKey.PubKey().(crypto.PubKeyEd25519)
  129. }
  130. }
  131. // Switch.Start() starts all the reactors, peers, and listeners.
  132. func (sw *Switch) OnStart() error {
  133. sw.BaseService.OnStart()
  134. // Start reactors
  135. for _, reactor := range sw.reactors {
  136. _, err := reactor.Start()
  137. if err != nil {
  138. return err
  139. }
  140. }
  141. // Start peers
  142. for _, peer := range sw.peers.List() {
  143. sw.startInitPeer(peer)
  144. }
  145. // Start listeners
  146. for _, listener := range sw.listeners {
  147. go sw.listenerRoutine(listener)
  148. }
  149. return nil
  150. }
  151. func (sw *Switch) OnStop() {
  152. sw.BaseService.OnStop()
  153. // Stop listeners
  154. for _, listener := range sw.listeners {
  155. listener.Stop()
  156. }
  157. sw.listeners = nil
  158. // Stop peers
  159. for _, peer := range sw.peers.List() {
  160. peer.Stop()
  161. sw.peers.Remove(peer)
  162. }
  163. // Stop reactors
  164. for _, reactor := range sw.reactors {
  165. reactor.Stop()
  166. }
  167. }
  168. // NOTE: This performs a blocking handshake before the peer is added.
  169. // CONTRACT: Iff error is returned, peer is nil, and conn is immediately closed.
  170. func (sw *Switch) AddPeerWithConnection(conn net.Conn, outbound bool) (*Peer, error) {
  171. // Filter by addr (ie. ip:port)
  172. if err := sw.FilterConnByAddr(conn.RemoteAddr()); err != nil {
  173. conn.Close()
  174. return nil, err
  175. }
  176. // Set deadline for handshake so we don't block forever on conn.ReadFull
  177. conn.SetDeadline(time.Now().Add(
  178. time.Duration(sw.config.GetInt(configKeyHandshakeTimeoutSeconds)) * time.Second))
  179. // First, encrypt the connection.
  180. var sconn net.Conn = conn
  181. if sw.config.GetBool(configKeyAuthEnc) {
  182. var err error
  183. sconn, err = MakeSecretConnection(conn, sw.nodePrivKey)
  184. if err != nil {
  185. conn.Close()
  186. return nil, err
  187. }
  188. }
  189. // Filter by p2p-key
  190. if err := sw.FilterConnByPubKey(sconn.(*SecretConnection).RemotePubKey()); err != nil {
  191. sconn.Close()
  192. return nil, err
  193. }
  194. // Then, perform node handshake
  195. peerNodeInfo, err := peerHandshake(sconn, sw.nodeInfo)
  196. if err != nil {
  197. sconn.Close()
  198. return nil, err
  199. }
  200. if sw.config.GetBool(configKeyAuthEnc) {
  201. // Check that the professed PubKey matches the sconn's.
  202. if !peerNodeInfo.PubKey.Equals(sconn.(*SecretConnection).RemotePubKey()) {
  203. sconn.Close()
  204. return nil, fmt.Errorf("Ignoring connection with unmatching pubkey: %v vs %v",
  205. peerNodeInfo.PubKey, sconn.(*SecretConnection).RemotePubKey())
  206. }
  207. }
  208. // Avoid self
  209. if peerNodeInfo.PubKey.Equals(sw.nodeInfo.PubKey) {
  210. sconn.Close()
  211. return nil, fmt.Errorf("Ignoring connection from self")
  212. }
  213. // Check version, chain id
  214. if err := sw.nodeInfo.CompatibleWith(peerNodeInfo); err != nil {
  215. sconn.Close()
  216. return nil, err
  217. }
  218. peer := newPeer(sw.config, sconn, peerNodeInfo, outbound, sw.reactorsByCh, sw.chDescs, sw.StopPeerForError)
  219. // Add the peer to .peers
  220. // ignore if duplicate or if we already have too many for that IP range
  221. if err := sw.peers.Add(peer); err != nil {
  222. log.Notice("Ignoring peer", "error", err, "peer", peer)
  223. peer.Stop()
  224. return nil, err
  225. }
  226. // remove deadline and start peer
  227. conn.SetDeadline(time.Time{})
  228. if sw.IsRunning() {
  229. sw.startInitPeer(peer)
  230. }
  231. log.Notice("Added peer", "peer", peer)
  232. return peer, nil
  233. }
  234. func (sw *Switch) FilterConnByAddr(addr net.Addr) error {
  235. if sw.filterConnByAddr != nil {
  236. return sw.filterConnByAddr(addr)
  237. }
  238. return nil
  239. }
  240. func (sw *Switch) FilterConnByPubKey(pubkey crypto.PubKeyEd25519) error {
  241. if sw.filterConnByPubKey != nil {
  242. return sw.filterConnByPubKey(pubkey)
  243. }
  244. return nil
  245. }
  246. func (sw *Switch) SetAddrFilter(f func(net.Addr) error) {
  247. sw.filterConnByAddr = f
  248. }
  249. func (sw *Switch) SetPubKeyFilter(f func(crypto.PubKeyEd25519) error) {
  250. sw.filterConnByPubKey = f
  251. }
  252. func (sw *Switch) startInitPeer(peer *Peer) {
  253. peer.Start() // spawn send/recv routines
  254. sw.addPeerToReactors(peer) // run AddPeer on each reactor
  255. }
  256. //error type for seed errors
  257. type SeedError struct {
  258. seed string
  259. err error
  260. }
  261. type SeedErrors []SeedError
  262. func (se SeedErrors) Error() string {
  263. var str string
  264. for _, e := range se {
  265. str += ("seed: " + e.seed + " error: " + e.err.Error() + "; ")
  266. }
  267. return str
  268. }
  269. // Dial a list of seeds in random order
  270. func (sw *Switch) DialSeeds(seeds []string) error {
  271. ch := make(chan SeedError) //channel for collecting errors
  272. passing := 0 //number of passing seeds
  273. // permute the list, dial them in random order.
  274. perm := rand.Perm(len(seeds))
  275. for i := 0; i < len(perm); i++ {
  276. go func(i int) {
  277. time.Sleep(time.Duration(rand.Int63n(3000)) * time.Millisecond)
  278. j := perm[i]
  279. addr, err := NewNetAddressString(seeds[j])
  280. if err != nil {
  281. ch <- SeedError{seeds[j], err}
  282. } else {
  283. sw.dialSeed(addr)
  284. passing++
  285. }
  286. }(i)
  287. }
  288. //collect any errors from the channel
  289. var seedErrs SeedErrors
  290. for {
  291. seedErr := <-ch
  292. seedErrs = append(seedErrs, seedErr)
  293. if len(seedErrs)+passing == len(perm) {
  294. break
  295. }
  296. }
  297. return seedErrs
  298. }
  299. func (sw *Switch) dialSeed(addr *NetAddress) {
  300. peer, err := sw.DialPeerWithAddress(addr)
  301. if err != nil {
  302. log.Error("Error dialing seed", "error", err)
  303. return
  304. } else {
  305. log.Notice("Connected to seed", "peer", peer)
  306. }
  307. }
  308. func (sw *Switch) DialPeerWithAddress(addr *NetAddress) (*Peer, error) {
  309. log.Info("Dialing address", "address", addr)
  310. sw.dialing.Set(addr.IP.String(), addr)
  311. conn, err := addr.DialTimeout(time.Duration(
  312. sw.config.GetInt(configKeyDialTimeoutSeconds)) * time.Second)
  313. sw.dialing.Delete(addr.IP.String())
  314. if err != nil {
  315. log.Info("Failed dialing address", "address", addr, "error", err)
  316. return nil, err
  317. }
  318. if sw.config.GetBool(configFuzzEnable) {
  319. conn = FuzzConn(sw.config, conn)
  320. }
  321. peer, err := sw.AddPeerWithConnection(conn, true)
  322. if err != nil {
  323. log.Info("Failed adding peer", "address", addr, "conn", conn, "error", err)
  324. return nil, err
  325. }
  326. log.Notice("Dialed and added peer", "address", addr, "peer", peer)
  327. return peer, nil
  328. }
  329. func (sw *Switch) IsDialing(addr *NetAddress) bool {
  330. return sw.dialing.Has(addr.IP.String())
  331. }
  332. // Broadcast runs a go routine for each attempted send, which will block
  333. // trying to send for defaultSendTimeoutSeconds. Returns a channel
  334. // which receives success values for each attempted send (false if times out)
  335. // NOTE: Broadcast uses goroutines, so order of broadcast may not be preserved.
  336. func (sw *Switch) Broadcast(chID byte, msg interface{}) chan bool {
  337. successChan := make(chan bool, len(sw.peers.List()))
  338. log.Debug("Broadcast", "channel", chID, "msg", msg)
  339. for _, peer := range sw.peers.List() {
  340. go func(peer *Peer) {
  341. success := peer.Send(chID, msg)
  342. successChan <- success
  343. }(peer)
  344. }
  345. return successChan
  346. }
  347. // Returns the count of outbound/inbound and outbound-dialing peers.
  348. func (sw *Switch) NumPeers() (outbound, inbound, dialing int) {
  349. peers := sw.peers.List()
  350. for _, peer := range peers {
  351. if peer.outbound {
  352. outbound++
  353. } else {
  354. inbound++
  355. }
  356. }
  357. dialing = sw.dialing.Size()
  358. return
  359. }
  360. func (sw *Switch) Peers() IPeerSet {
  361. return sw.peers
  362. }
  363. // Disconnect from a peer due to external error.
  364. // TODO: make record depending on reason.
  365. func (sw *Switch) StopPeerForError(peer *Peer, reason interface{}) {
  366. log.Notice("Stopping peer for error", "peer", peer, "error", reason)
  367. sw.peers.Remove(peer)
  368. peer.Stop()
  369. sw.removePeerFromReactors(peer, reason)
  370. }
  371. // Disconnect from a peer gracefully.
  372. // TODO: handle graceful disconnects.
  373. func (sw *Switch) StopPeerGracefully(peer *Peer) {
  374. log.Notice("Stopping peer gracefully")
  375. sw.peers.Remove(peer)
  376. peer.Stop()
  377. sw.removePeerFromReactors(peer, nil)
  378. }
  379. func (sw *Switch) addPeerToReactors(peer *Peer) {
  380. for _, reactor := range sw.reactors {
  381. reactor.AddPeer(peer)
  382. }
  383. }
  384. func (sw *Switch) removePeerFromReactors(peer *Peer, reason interface{}) {
  385. for _, reactor := range sw.reactors {
  386. reactor.RemovePeer(peer, reason)
  387. }
  388. }
  389. func (sw *Switch) listenerRoutine(l Listener) {
  390. for {
  391. inConn, ok := <-l.Connections()
  392. if !ok {
  393. break
  394. }
  395. // ignore connection if we already have enough
  396. maxPeers := sw.config.GetInt(configKeyMaxNumPeers)
  397. if maxPeers <= sw.peers.Size() {
  398. log.Info("Ignoring inbound connection: already have enough peers", "address", inConn.RemoteAddr().String(), "numPeers", sw.peers.Size(), "max", maxPeers)
  399. continue
  400. }
  401. if sw.config.GetBool(configFuzzEnable) {
  402. inConn = FuzzConn(sw.config, inConn)
  403. }
  404. // New inbound connection!
  405. _, err := sw.AddPeerWithConnection(inConn, false)
  406. if err != nil {
  407. log.Notice("Ignoring inbound connection: error on AddPeerWithConnection", "address", inConn.RemoteAddr().String(), "error", err)
  408. continue
  409. }
  410. // NOTE: We don't yet have the listening port of the
  411. // remote (if they have a listener at all).
  412. // The peerHandshake will handle that
  413. }
  414. // cleanup
  415. }
  416. //-----------------------------------------------------------------------------
  417. type SwitchEventNewPeer struct {
  418. Peer *Peer
  419. }
  420. type SwitchEventDonePeer struct {
  421. Peer *Peer
  422. Error interface{}
  423. }
  424. //------------------------------------------------------------------
  425. // Switches connected via arbitrary net.Conn; useful for testing
  426. // Returns n switches, connected according to the connect func.
  427. // If connect==Connect2Switches, the switches will be fully connected.
  428. // initSwitch defines how the ith switch should be initialized (ie. with what reactors).
  429. // NOTE: panics if any switch fails to start.
  430. func MakeConnectedSwitches(n int, initSwitch func(int, *Switch) *Switch, connect func([]*Switch, int, int)) []*Switch {
  431. switches := make([]*Switch, n)
  432. for i := 0; i < n; i++ {
  433. switches[i] = makeSwitch(i, "testing", "123.123.123", initSwitch)
  434. }
  435. if err := StartSwitches(switches); err != nil {
  436. panic(err)
  437. }
  438. for i := 0; i < n; i++ {
  439. for j := i; j < n; j++ {
  440. connect(switches, i, j)
  441. }
  442. }
  443. return switches
  444. }
  445. var PanicOnAddPeerErr = false
  446. // Will connect switches i and j via net.Pipe()
  447. // Blocks until a conection is established.
  448. // NOTE: caller ensures i and j are within bounds
  449. func Connect2Switches(switches []*Switch, i, j int) {
  450. switchI := switches[i]
  451. switchJ := switches[j]
  452. c1, c2 := net.Pipe()
  453. doneCh := make(chan struct{})
  454. go func() {
  455. _, err := switchI.AddPeerWithConnection(c1, false) // AddPeer is blocking, requires handshake.
  456. if PanicOnAddPeerErr && err != nil {
  457. panic(err)
  458. }
  459. doneCh <- struct{}{}
  460. }()
  461. go func() {
  462. _, err := switchJ.AddPeerWithConnection(c2, true)
  463. if PanicOnAddPeerErr && err != nil {
  464. panic(err)
  465. }
  466. doneCh <- struct{}{}
  467. }()
  468. <-doneCh
  469. <-doneCh
  470. }
  471. func StartSwitches(switches []*Switch) error {
  472. for _, s := range switches {
  473. _, err := s.Start() // start switch and reactors
  474. if err != nil {
  475. return err
  476. }
  477. }
  478. return nil
  479. }
  480. func makeSwitch(i int, network, version string, initSwitch func(int, *Switch) *Switch) *Switch {
  481. privKey := crypto.GenPrivKeyEd25519()
  482. // new switch, add reactors
  483. // TODO: let the config be passed in?
  484. s := initSwitch(i, NewSwitch(cfg.NewMapConfig(nil)))
  485. s.SetNodeInfo(&NodeInfo{
  486. PubKey: privKey.PubKey().(crypto.PubKeyEd25519),
  487. Moniker: Fmt("switch%d", i),
  488. Network: network,
  489. Version: version,
  490. })
  491. s.SetNodePrivKey(privKey)
  492. return s
  493. }