You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

401 lines
10 KiB

9 years ago
9 years ago
9 years ago
  1. package p2p
  2. import (
  3. "errors"
  4. "fmt"
  5. "math/rand"
  6. "net"
  7. "time"
  8. . "github.com/tendermint/go-common"
  9. "github.com/tendermint/go-crypto"
  10. "github.com/tendermint/log15"
  11. )
  12. type Reactor interface {
  13. Service // Start, Stop
  14. SetSwitch(*Switch)
  15. GetChannels() []*ChannelDescriptor
  16. AddPeer(peer *Peer)
  17. RemovePeer(peer *Peer, reason interface{})
  18. Receive(chID byte, peer *Peer, msgBytes []byte)
  19. }
  20. //--------------------------------------
  21. type BaseReactor struct {
  22. QuitService // Provides Start, Stop, .Quit
  23. Switch *Switch
  24. }
  25. func NewBaseReactor(log log15.Logger, name string, impl Reactor) *BaseReactor {
  26. return &BaseReactor{
  27. QuitService: *NewQuitService(log, name, impl),
  28. Switch: nil,
  29. }
  30. }
  31. func (br *BaseReactor) SetSwitch(sw *Switch) {
  32. br.Switch = sw
  33. }
  34. func (_ *BaseReactor) GetChannels() []*ChannelDescriptor { return nil }
  35. func (_ *BaseReactor) AddPeer(peer *Peer) {}
  36. func (_ *BaseReactor) RemovePeer(peer *Peer, reason interface{}) {}
  37. func (_ *BaseReactor) Receive(chID byte, peer *Peer, msgBytes []byte) {}
  38. //-----------------------------------------------------------------------------
  39. /*
  40. The `Switch` handles peer connections and exposes an API to receive incoming messages
  41. on `Reactors`. Each `Reactor` is responsible for handling incoming messages of one
  42. or more `Channels`. So while sending outgoing messages is typically performed on the peer,
  43. incoming messages are received on the reactor.
  44. */
  45. type Switch struct {
  46. BaseService
  47. listeners []Listener
  48. reactors map[string]Reactor
  49. chDescs []*ChannelDescriptor
  50. reactorsByCh map[byte]Reactor
  51. peers *PeerSet
  52. dialing *CMap
  53. nodeInfo *NodeInfo // our node info
  54. nodePrivKey crypto.PrivKeyEd25519 // our node privkey
  55. }
  56. var (
  57. ErrSwitchDuplicatePeer = errors.New("Duplicate peer")
  58. ErrSwitchMaxPeersPerIPRange = errors.New("IP range has too many peers")
  59. )
  60. const (
  61. peerDialTimeoutSeconds = 3 // TODO make this configurable
  62. handshakeTimeoutSeconds = 20 // TODO make this configurable
  63. maxNumPeers = 50 // TODO make this configurable
  64. )
  65. func NewSwitch() *Switch {
  66. sw := &Switch{
  67. reactors: make(map[string]Reactor),
  68. chDescs: make([]*ChannelDescriptor, 0),
  69. reactorsByCh: make(map[byte]Reactor),
  70. peers: NewPeerSet(),
  71. dialing: NewCMap(),
  72. nodeInfo: nil,
  73. }
  74. sw.BaseService = *NewBaseService(log, "P2P Switch", sw)
  75. return sw
  76. }
  77. // Not goroutine safe.
  78. func (sw *Switch) AddReactor(name string, reactor Reactor) Reactor {
  79. // Validate the reactor.
  80. // No two reactors can share the same channel.
  81. reactorChannels := reactor.GetChannels()
  82. for _, chDesc := range reactorChannels {
  83. chID := chDesc.ID
  84. if sw.reactorsByCh[chID] != nil {
  85. PanicSanity(fmt.Sprintf("Channel %X has multiple reactors %v & %v", chID, sw.reactorsByCh[chID], reactor))
  86. }
  87. sw.chDescs = append(sw.chDescs, chDesc)
  88. sw.reactorsByCh[chID] = reactor
  89. }
  90. sw.reactors[name] = reactor
  91. reactor.SetSwitch(sw)
  92. return reactor
  93. }
  94. // Not goroutine safe.
  95. func (sw *Switch) Reactors() map[string]Reactor {
  96. return sw.reactors
  97. }
  98. // Not goroutine safe.
  99. func (sw *Switch) Reactor(name string) Reactor {
  100. return sw.reactors[name]
  101. }
  102. // Not goroutine safe.
  103. func (sw *Switch) AddListener(l Listener) {
  104. sw.listeners = append(sw.listeners, l)
  105. }
  106. // Not goroutine safe.
  107. func (sw *Switch) Listeners() []Listener {
  108. return sw.listeners
  109. }
  110. // Not goroutine safe.
  111. func (sw *Switch) IsListening() bool {
  112. return len(sw.listeners) > 0
  113. }
  114. // Not goroutine safe.
  115. func (sw *Switch) SetNodeInfo(nodeInfo *NodeInfo) {
  116. sw.nodeInfo = nodeInfo
  117. }
  118. // Not goroutine safe.
  119. func (sw *Switch) NodeInfo() *NodeInfo {
  120. return sw.nodeInfo
  121. }
  122. // Not goroutine safe.
  123. // NOTE: Overwrites sw.nodeInfo.PubKey
  124. func (sw *Switch) SetNodePrivKey(nodePrivKey crypto.PrivKeyEd25519) {
  125. sw.nodePrivKey = nodePrivKey
  126. if sw.nodeInfo != nil {
  127. sw.nodeInfo.PubKey = nodePrivKey.PubKey().(crypto.PubKeyEd25519)
  128. }
  129. }
  130. // Switch.Start() starts all the reactors, peers, and listeners.
  131. func (sw *Switch) OnStart() error {
  132. sw.BaseService.OnStart()
  133. // Start reactors
  134. for _, reactor := range sw.reactors {
  135. _, err := reactor.Start()
  136. if err != nil {
  137. return err
  138. }
  139. }
  140. // Start peers
  141. for _, peer := range sw.peers.List() {
  142. sw.startInitPeer(peer)
  143. }
  144. // Start listeners
  145. for _, listener := range sw.listeners {
  146. go sw.listenerRoutine(listener)
  147. }
  148. return nil
  149. }
  150. func (sw *Switch) OnStop() {
  151. sw.BaseService.OnStop()
  152. // Stop listeners
  153. for _, listener := range sw.listeners {
  154. listener.Stop()
  155. }
  156. sw.listeners = nil
  157. // Stop peers
  158. for _, peer := range sw.peers.List() {
  159. peer.Stop()
  160. }
  161. sw.peers = NewPeerSet()
  162. // Stop reactors
  163. for _, reactor := range sw.reactors {
  164. reactor.Stop()
  165. }
  166. }
  167. // NOTE: This performs a blocking handshake before the peer is added.
  168. // CONTRACT: Iff error is returned, peer is nil, and conn is immediately closed.
  169. func (sw *Switch) AddPeerWithConnection(conn net.Conn, outbound bool) (*Peer, error) {
  170. // Set deadline for handshake so we don't block forever on conn.ReadFull
  171. conn.SetDeadline(time.Now().Add(handshakeTimeoutSeconds * time.Second))
  172. // First, encrypt the connection.
  173. sconn, err := MakeSecretConnection(conn, sw.nodePrivKey)
  174. if err != nil {
  175. conn.Close()
  176. return nil, err
  177. }
  178. // Then, perform node handshake
  179. peerNodeInfo, err := peerHandshake(sconn, sw.nodeInfo)
  180. if err != nil {
  181. sconn.Close()
  182. return nil, err
  183. }
  184. // Check that the professed PubKey matches the sconn's.
  185. if !peerNodeInfo.PubKey.Equals(sconn.RemotePubKey()) {
  186. sconn.Close()
  187. return nil, fmt.Errorf("Ignoring connection with unmatching pubkey: %v vs %v",
  188. peerNodeInfo.PubKey, sconn.RemotePubKey())
  189. }
  190. // Avoid self
  191. if peerNodeInfo.PubKey.Equals(sw.nodeInfo.PubKey) {
  192. sconn.Close()
  193. return nil, fmt.Errorf("Ignoring connection from self")
  194. }
  195. // Check version, chain id
  196. if err := sw.nodeInfo.CompatibleWith(peerNodeInfo); err != nil {
  197. sconn.Close()
  198. return nil, err
  199. }
  200. peer := newPeer(sconn, peerNodeInfo, outbound, sw.reactorsByCh, sw.chDescs, sw.StopPeerForError)
  201. // Add the peer to .peers
  202. // ignore if duplicate or if we already have too many for that IP range
  203. if err := sw.peers.Add(peer); err != nil {
  204. log.Notice("Ignoring peer", "error", err, "peer", peer)
  205. peer.Stop()
  206. return nil, err
  207. }
  208. // remove deadline and start peer
  209. conn.SetDeadline(time.Time{})
  210. if sw.IsRunning() {
  211. sw.startInitPeer(peer)
  212. }
  213. log.Notice("Added peer", "peer", peer)
  214. return peer, nil
  215. }
  216. func (sw *Switch) startInitPeer(peer *Peer) {
  217. peer.Start() // spawn send/recv routines
  218. sw.addPeerToReactors(peer) // run AddPeer on each reactor
  219. }
  220. // Dial a list of seeds in random order
  221. // Spawns a go routine for each dial
  222. func (sw *Switch) DialSeeds(seeds []string) {
  223. // permute the list, dial them in random order.
  224. perm := rand.Perm(len(seeds))
  225. for i := 0; i < len(perm); i++ {
  226. go func(i int) {
  227. time.Sleep(time.Duration(rand.Int63n(3000)) * time.Millisecond)
  228. j := perm[i]
  229. addr := NewNetAddressString(seeds[j])
  230. sw.dialSeed(addr)
  231. }(i)
  232. }
  233. }
  234. func (sw *Switch) dialSeed(addr *NetAddress) {
  235. peer, err := sw.DialPeerWithAddress(addr)
  236. if err != nil {
  237. log.Error("Error dialing seed", "error", err)
  238. return
  239. } else {
  240. log.Notice("Connected to seed", "peer", peer)
  241. }
  242. }
  243. func (sw *Switch) DialPeerWithAddress(addr *NetAddress) (*Peer, error) {
  244. log.Info("Dialing address", "address", addr)
  245. sw.dialing.Set(addr.IP.String(), addr)
  246. conn, err := addr.DialTimeout(peerDialTimeoutSeconds * time.Second)
  247. sw.dialing.Delete(addr.IP.String())
  248. if err != nil {
  249. log.Info("Failed dialing address", "address", addr, "error", err)
  250. return nil, err
  251. }
  252. peer, err := sw.AddPeerWithConnection(conn, true)
  253. if err != nil {
  254. log.Info("Failed adding peer", "address", addr, "conn", conn, "error", err)
  255. return nil, err
  256. }
  257. log.Notice("Dialed and added peer", "address", addr, "peer", peer)
  258. return peer, nil
  259. }
  260. func (sw *Switch) IsDialing(addr *NetAddress) bool {
  261. return sw.dialing.Has(addr.IP.String())
  262. }
  263. // Broadcast runs a go routine for each attempted send, which will block
  264. // trying to send for defaultSendTimeoutSeconds. Returns a channel
  265. // which receives success values for each attempted send (false if times out)
  266. func (sw *Switch) Broadcast(chID byte, msg interface{}) chan bool {
  267. successChan := make(chan bool, len(sw.peers.List()))
  268. log.Info("Broadcast", "channel", chID, "msg", msg)
  269. for _, peer := range sw.peers.List() {
  270. go func(peer *Peer) {
  271. success := peer.Send(chID, msg)
  272. successChan <- success
  273. }(peer)
  274. }
  275. return successChan
  276. }
  277. // Returns the count of outbound/inbound and outbound-dialing peers.
  278. func (sw *Switch) NumPeers() (outbound, inbound, dialing int) {
  279. peers := sw.peers.List()
  280. for _, peer := range peers {
  281. if peer.outbound {
  282. outbound++
  283. } else {
  284. inbound++
  285. }
  286. }
  287. dialing = sw.dialing.Size()
  288. return
  289. }
  290. func (sw *Switch) Peers() IPeerSet {
  291. return sw.peers
  292. }
  293. // Disconnect from a peer due to external error.
  294. // TODO: make record depending on reason.
  295. func (sw *Switch) StopPeerForError(peer *Peer, reason interface{}) {
  296. log.Notice("Stopping peer for error", "peer", peer, "error", reason)
  297. sw.peers.Remove(peer)
  298. peer.Stop()
  299. sw.removePeerFromReactors(peer, reason)
  300. }
  301. // Disconnect from a peer gracefully.
  302. // TODO: handle graceful disconnects.
  303. func (sw *Switch) StopPeerGracefully(peer *Peer) {
  304. log.Notice("Stopping peer gracefully")
  305. sw.peers.Remove(peer)
  306. peer.Stop()
  307. sw.removePeerFromReactors(peer, nil)
  308. }
  309. func (sw *Switch) addPeerToReactors(peer *Peer) {
  310. for _, reactor := range sw.reactors {
  311. reactor.AddPeer(peer)
  312. }
  313. }
  314. func (sw *Switch) removePeerFromReactors(peer *Peer, reason interface{}) {
  315. for _, reactor := range sw.reactors {
  316. reactor.RemovePeer(peer, reason)
  317. }
  318. }
  319. func (sw *Switch) listenerRoutine(l Listener) {
  320. for {
  321. inConn, ok := <-l.Connections()
  322. if !ok {
  323. break
  324. }
  325. // ignore connection if we already have enough
  326. if maxNumPeers <= sw.peers.Size() {
  327. log.Info("Ignoring inbound connection: already have enough peers", "address", inConn.RemoteAddr().String(), "numPeers", sw.peers.Size(), "max", maxNumPeers)
  328. continue
  329. }
  330. // New inbound connection!
  331. _, err := sw.AddPeerWithConnection(inConn, false)
  332. if err != nil {
  333. log.Notice("Ignoring inbound connection: error on AddPeerWithConnection", "address", inConn.RemoteAddr().String(), "error", err)
  334. continue
  335. }
  336. // NOTE: We don't yet have the listening port of the
  337. // remote (if they have a listener at all).
  338. // The peerHandshake will handle that
  339. }
  340. // cleanup
  341. }
  342. //-----------------------------------------------------------------------------
  343. type SwitchEventNewPeer struct {
  344. Peer *Peer
  345. }
  346. type SwitchEventDonePeer struct {
  347. Peer *Peer
  348. Error interface{}
  349. }