|
|
- package p2p
-
- import (
- "errors"
- "fmt"
- "math/rand"
- "net"
- "time"
-
- crypto "github.com/tendermint/go-crypto"
- cfg "github.com/tendermint/tendermint/config"
- cmn "github.com/tendermint/tmlibs/common"
- )
-
- const (
- reconnectAttempts = 30
- reconnectInterval = 3 * time.Second
- )
-
- type Reactor interface {
- cmn.Service // Start, Stop
-
- SetSwitch(*Switch)
- GetChannels() []*ChannelDescriptor
- AddPeer(peer Peer)
- RemovePeer(peer Peer, reason interface{})
- Receive(chID byte, peer Peer, msgBytes []byte)
- }
-
- //--------------------------------------
-
- type BaseReactor struct {
- cmn.BaseService // Provides Start, Stop, .Quit
- Switch *Switch
- }
-
- func NewBaseReactor(name string, impl Reactor) *BaseReactor {
- return &BaseReactor{
- BaseService: *cmn.NewBaseService(nil, name, impl),
- Switch: nil,
- }
- }
-
- func (br *BaseReactor) SetSwitch(sw *Switch) {
- br.Switch = sw
- }
- func (_ *BaseReactor) GetChannels() []*ChannelDescriptor { return nil }
- func (_ *BaseReactor) AddPeer(peer Peer) {}
- func (_ *BaseReactor) RemovePeer(peer Peer, reason interface{}) {}
- func (_ *BaseReactor) Receive(chID byte, peer Peer, msgBytes []byte) {}
-
- //-----------------------------------------------------------------------------
-
- /*
- The `Switch` handles peer connections and exposes an API to receive incoming messages
- on `Reactors`. Each `Reactor` is responsible for handling incoming messages of one
- or more `Channels`. So while sending outgoing messages is typically performed on the peer,
- incoming messages are received on the reactor.
- */
- type Switch struct {
- cmn.BaseService
-
- config *cfg.P2PConfig
- peerConfig *PeerConfig
- listeners []Listener
- reactors map[string]Reactor
- chDescs []*ChannelDescriptor
- reactorsByCh map[byte]Reactor
- peers *PeerSet
- dialing *cmn.CMap
- nodeInfo *NodeInfo // our node info
- nodePrivKey crypto.PrivKeyEd25519 // our node privkey
-
- filterConnByAddr func(net.Addr) error
- filterConnByPubKey func(crypto.PubKeyEd25519) error
- }
-
- var (
- ErrSwitchDuplicatePeer = errors.New("Duplicate peer")
- )
-
- func NewSwitch(config *cfg.P2PConfig) *Switch {
- sw := &Switch{
- config: config,
- peerConfig: DefaultPeerConfig(),
- reactors: make(map[string]Reactor),
- chDescs: make([]*ChannelDescriptor, 0),
- reactorsByCh: make(map[byte]Reactor),
- peers: NewPeerSet(),
- dialing: cmn.NewCMap(),
- nodeInfo: nil,
- }
-
- // TODO: collapse the peerConfig into the config ?
- sw.peerConfig.MConfig.flushThrottle = time.Duration(config.FlushThrottleTimeout) * time.Millisecond
- sw.peerConfig.MConfig.SendRate = config.SendRate
- sw.peerConfig.MConfig.RecvRate = config.RecvRate
- sw.peerConfig.MConfig.maxMsgPacketPayloadSize = config.MaxMsgPacketPayloadSize
-
- sw.BaseService = *cmn.NewBaseService(nil, "P2P Switch", sw)
- return sw
- }
-
- // AddReactor adds the given reactor to the switch.
- // NOTE: Not goroutine safe.
- func (sw *Switch) AddReactor(name string, reactor Reactor) Reactor {
- // Validate the reactor.
- // No two reactors can share the same channel.
- reactorChannels := reactor.GetChannels()
- for _, chDesc := range reactorChannels {
- chID := chDesc.ID
- if sw.reactorsByCh[chID] != nil {
- cmn.PanicSanity(fmt.Sprintf("Channel %X has multiple reactors %v & %v", chID, sw.reactorsByCh[chID], reactor))
- }
- sw.chDescs = append(sw.chDescs, chDesc)
- sw.reactorsByCh[chID] = reactor
- }
- sw.reactors[name] = reactor
- reactor.SetSwitch(sw)
- return reactor
- }
-
- // Reactors returns a map of reactors registered on the switch.
- // NOTE: Not goroutine safe.
- func (sw *Switch) Reactors() map[string]Reactor {
- return sw.reactors
- }
-
- // Reactor returns the reactor with the given name.
- // NOTE: Not goroutine safe.
- func (sw *Switch) Reactor(name string) Reactor {
- return sw.reactors[name]
- }
-
- // AddListener adds the given listener to the switch for listening to incoming peer connections.
- // NOTE: Not goroutine safe.
- func (sw *Switch) AddListener(l Listener) {
- sw.listeners = append(sw.listeners, l)
- }
-
- // Listeners returns the list of listeners the switch listens on.
- // NOTE: Not goroutine safe.
- func (sw *Switch) Listeners() []Listener {
- return sw.listeners
- }
-
- // IsListening returns true if the switch has at least one listener.
- // NOTE: Not goroutine safe.
- func (sw *Switch) IsListening() bool {
- return len(sw.listeners) > 0
- }
-
- // SetNodeInfo sets the switch's NodeInfo for checking compatibility and handshaking with other nodes.
- // NOTE: Not goroutine safe.
- func (sw *Switch) SetNodeInfo(nodeInfo *NodeInfo) {
- sw.nodeInfo = nodeInfo
- }
-
- // NodeInfo returns the switch's NodeInfo.
- // NOTE: Not goroutine safe.
- func (sw *Switch) NodeInfo() *NodeInfo {
- return sw.nodeInfo
- }
-
- // SetNodePrivKey sets the switche's private key for authenticated encryption.
- // NOTE: Overwrites sw.nodeInfo.PubKey.
- // NOTE: Not goroutine safe.
- func (sw *Switch) SetNodePrivKey(nodePrivKey crypto.PrivKeyEd25519) {
- sw.nodePrivKey = nodePrivKey
- if sw.nodeInfo != nil {
- sw.nodeInfo.PubKey = nodePrivKey.PubKey().Unwrap().(crypto.PubKeyEd25519)
- }
- }
-
- // OnStart implements BaseService. It starts all the reactors, peers, and listeners.
- func (sw *Switch) OnStart() error {
- sw.BaseService.OnStart()
- // Start reactors
- for _, reactor := range sw.reactors {
- _, err := reactor.Start()
- if err != nil {
- return err
- }
- }
-
- // Start listeners
- for _, listener := range sw.listeners {
- go sw.listenerRoutine(listener)
- }
- return nil
- }
-
- // OnStop implements BaseService. It stops all listeners, peers, and reactors.
- func (sw *Switch) OnStop() {
- sw.BaseService.OnStop()
- // Stop listeners
- for _, listener := range sw.listeners {
- listener.Stop()
- }
- sw.listeners = nil
- // Stop peers
- for _, peer := range sw.peers.List() {
- peer.Stop()
- sw.peers.Remove(peer)
- }
- // Stop reactors
- for _, reactor := range sw.reactors {
- reactor.Stop()
- }
- }
-
- // addPeer checks the given peer's validity, performs a handshake, and adds the peer to the switch
- // and to all registered reactors.
- // NOTE: This performs a blocking handshake before the peer is added.
- // CONTRACT: If error is returned, peer is nil, and conn is immediately closed.
- func (sw *Switch) addPeer(peer *peer) error {
-
- if err := sw.FilterConnByAddr(peer.Addr()); err != nil {
- return err
- }
-
- if err := sw.FilterConnByPubKey(peer.PubKey()); err != nil {
- return err
- }
-
- if err := peer.HandshakeTimeout(sw.nodeInfo, time.Duration(sw.peerConfig.HandshakeTimeout*time.Second)); err != nil {
- return err
- }
-
- // Avoid self
- if sw.nodeInfo.PubKey.Equals(peer.PubKey().Wrap()) {
- return errors.New("Ignoring connection from self")
- }
-
- // Check version, chain id
- if err := sw.nodeInfo.CompatibleWith(peer.NodeInfo()); err != nil {
- return err
- }
-
- // Check for duplicate peer
- if sw.peers.Has(peer.Key()) {
- return ErrSwitchDuplicatePeer
-
- }
-
- // Start peer
- if sw.IsRunning() {
- sw.startInitPeer(peer)
- }
-
- // Add the peer to .peers.
- // We start it first so that a peer in the list is safe to Stop.
- // It should not err since we already checked peers.Has()
- if err := sw.peers.Add(peer); err != nil {
- return err
- }
-
- sw.Logger.Info("Added peer", "peer", peer)
- return nil
- }
-
- // FilterConnByAddr returns an error if connecting to the given address is forbidden.
- func (sw *Switch) FilterConnByAddr(addr net.Addr) error {
- if sw.filterConnByAddr != nil {
- return sw.filterConnByAddr(addr)
- }
- return nil
- }
-
- // FilterConnByPubKey returns an error if connecting to the given public key is forbidden.
- func (sw *Switch) FilterConnByPubKey(pubkey crypto.PubKeyEd25519) error {
- if sw.filterConnByPubKey != nil {
- return sw.filterConnByPubKey(pubkey)
- }
- return nil
-
- }
-
- // SetAddrFilter sets the function for filtering connections by address.
- func (sw *Switch) SetAddrFilter(f func(net.Addr) error) {
- sw.filterConnByAddr = f
- }
-
- // SetPubKeyFilter sets the function for filtering connections by public key.
- func (sw *Switch) SetPubKeyFilter(f func(crypto.PubKeyEd25519) error) {
- sw.filterConnByPubKey = f
- }
-
- func (sw *Switch) startInitPeer(peer *peer) {
- peer.Start() // spawn send/recv routines
- for _, reactor := range sw.reactors {
- reactor.AddPeer(peer)
- }
- }
-
- // DialSeeds dials a list of seeds asynchronously in random order
- func (sw *Switch) DialSeeds(addrBook *AddrBook, seeds []string) error {
-
- netAddrs, err := NewNetAddressStrings(seeds)
- if err != nil {
- return err
- }
-
- if addrBook != nil {
- // add seeds to `addrBook`
- ourAddrS := sw.nodeInfo.ListenAddr
- ourAddr, _ := NewNetAddressString(ourAddrS)
- for _, netAddr := range netAddrs {
- // do not add ourselves
- if netAddr.Equals(ourAddr) {
- continue
- }
- addrBook.AddAddress(netAddr, ourAddr)
- }
- addrBook.Save()
- }
-
- // permute the list, dial them in random order.
- perm := rand.Perm(len(netAddrs))
- for i := 0; i < len(perm); i++ {
- go func(i int) {
- time.Sleep(time.Duration(rand.Int63n(3000)) * time.Millisecond)
- j := perm[i]
- sw.dialSeed(netAddrs[j])
- }(i)
- }
- return nil
- }
-
- func (sw *Switch) dialSeed(addr *NetAddress) {
- peer, err := sw.DialPeerWithAddress(addr, true)
- if err != nil {
- sw.Logger.Error("Error dialing seed", "err", err)
- } else {
- sw.Logger.Info("Connected to seed", "peer", peer)
- }
- }
-
- // DialPeerWithAddress dials the given peer and runs sw.addPeer if it connects successfully.
- // If `persistent == true`, the switch will always try to reconnect to this peer if the connection ever fails.
- func (sw *Switch) DialPeerWithAddress(addr *NetAddress, persistent bool) (Peer, error) {
- sw.dialing.Set(addr.IP.String(), addr)
- defer sw.dialing.Delete(addr.IP.String())
-
- sw.Logger.Info("Dialing peer", "address", addr)
- peer, err := newOutboundPeer(addr, sw.reactorsByCh, sw.chDescs, sw.StopPeerForError, sw.nodePrivKey, sw.peerConfig)
- if err != nil {
- sw.Logger.Error("Failed to dial peer", "address", addr, "err", err)
- return nil, err
- }
- peer.SetLogger(sw.Logger.With("peer", addr))
- if persistent {
- peer.makePersistent()
- }
- err = sw.addPeer(peer)
- if err != nil {
- sw.Logger.Error("Failed to add peer", "address", addr, "err", err)
- peer.CloseConn()
- return nil, err
- }
- sw.Logger.Info("Dialed and added peer", "address", addr, "peer", peer)
- return peer, nil
- }
-
- // IsDialing returns true if the switch is currently dialing the given address.
- func (sw *Switch) IsDialing(addr *NetAddress) bool {
- return sw.dialing.Has(addr.IP.String())
- }
-
- // Broadcast runs a go routine for each attempted send, which will block
- // trying to send for defaultSendTimeoutSeconds. Returns a channel
- // which receives success values for each attempted send (false if times out)
- // NOTE: Broadcast uses goroutines, so order of broadcast may not be preserved.
- // TODO: Something more intelligent.
- func (sw *Switch) Broadcast(chID byte, msg interface{}) chan bool {
- successChan := make(chan bool, len(sw.peers.List()))
- sw.Logger.Debug("Broadcast", "channel", chID, "msg", msg)
- for _, peer := range sw.peers.List() {
- go func(peer Peer) {
- success := peer.Send(chID, msg)
- successChan <- success
- }(peer)
- }
- return successChan
- }
-
- // NumPeers returns the count of outbound/inbound and outbound-dialing peers.
- func (sw *Switch) NumPeers() (outbound, inbound, dialing int) {
- peers := sw.peers.List()
- for _, peer := range peers {
- if peer.IsOutbound() {
- outbound++
- } else {
- inbound++
- }
- }
- dialing = sw.dialing.Size()
- return
- }
-
- // Peers returns the set of peers the switch is connected to.
- func (sw *Switch) Peers() IPeerSet {
- return sw.peers
- }
-
- // StopPeerForError disconnects from a peer due to external error.
- // If the peer is persistent, it will attempt to reconnect.
- // TODO: make record depending on reason.
- func (sw *Switch) StopPeerForError(peer Peer, reason interface{}) {
- addr, _ := NewNetAddressString(peer.NodeInfo().RemoteAddr)
- sw.Logger.Error("Stopping peer for error", "peer", peer, "err", reason)
- sw.stopAndRemovePeer(peer, reason)
-
- if peer.IsPersistent() {
- go func() {
- sw.Logger.Info("Reconnecting to peer", "peer", peer)
- for i := 1; i < reconnectAttempts; i++ {
- if !sw.IsRunning() {
- return
- }
-
- peer, err := sw.DialPeerWithAddress(addr, true)
- if err != nil {
- if i == reconnectAttempts {
- sw.Logger.Info("Error reconnecting to peer. Giving up", "tries", i, "err", err)
- return
- }
- sw.Logger.Info("Error reconnecting to peer. Trying again", "tries", i, "err", err)
- time.Sleep(reconnectInterval)
- continue
- }
-
- sw.Logger.Info("Reconnected to peer", "peer", peer)
- return
- }
- }()
- }
- }
-
- // StopPeerGracefully disconnects from a peer gracefully.
- // TODO: handle graceful disconnects.
- func (sw *Switch) StopPeerGracefully(peer Peer) {
- sw.Logger.Info("Stopping peer gracefully")
- sw.stopAndRemovePeer(peer, nil)
- }
-
- func (sw *Switch) stopAndRemovePeer(peer Peer, reason interface{}) {
- sw.peers.Remove(peer)
- peer.Stop()
- for _, reactor := range sw.reactors {
- reactor.RemovePeer(peer, reason)
- }
- }
-
- func (sw *Switch) listenerRoutine(l Listener) {
- for {
- inConn, ok := <-l.Connections()
- if !ok {
- break
- }
-
- // ignore connection if we already have enough
- maxPeers := sw.config.MaxNumPeers
- if maxPeers <= sw.peers.Size() {
- sw.Logger.Info("Ignoring inbound connection: already have enough peers", "address", inConn.RemoteAddr().String(), "numPeers", sw.peers.Size(), "max", maxPeers)
- continue
- }
-
- // New inbound connection!
- err := sw.addPeerWithConnectionAndConfig(inConn, sw.peerConfig)
- if err != nil {
- sw.Logger.Info("Ignoring inbound connection: error while adding peer", "address", inConn.RemoteAddr().String(), "err", err)
- continue
- }
-
- // NOTE: We don't yet have the listening port of the
- // remote (if they have a listener at all).
- // The peerHandshake will handle that
- }
-
- // cleanup
- }
-
- //-----------------------------------------------------------------------------
-
- type SwitchEventNewPeer struct {
- Peer Peer
- }
-
- type SwitchEventDonePeer struct {
- Peer Peer
- Error interface{}
- }
-
- //------------------------------------------------------------------
- // Switches connected via arbitrary net.Conn; useful for testing
-
- // MakeConnectedSwitches returns n switches, connected according to the connect func.
- // If connect==Connect2Switches, the switches will be fully connected.
- // initSwitch defines how the ith switch should be initialized (ie. with what reactors).
- // NOTE: panics if any switch fails to start.
- func MakeConnectedSwitches(cfg *cfg.P2PConfig, n int, initSwitch func(int, *Switch) *Switch, connect func([]*Switch, int, int)) []*Switch {
- switches := make([]*Switch, n)
- for i := 0; i < n; i++ {
- switches[i] = makeSwitch(cfg, i, "testing", "123.123.123", initSwitch)
- }
-
- if err := StartSwitches(switches); err != nil {
- panic(err)
- }
-
- for i := 0; i < n; i++ {
- for j := i; j < n; j++ {
- connect(switches, i, j)
- }
- }
-
- return switches
- }
-
- var PanicOnAddPeerErr = false
-
- // Connect2Switches will connect switches i and j via net.Pipe()
- // Blocks until a conection is established.
- // NOTE: caller ensures i and j are within bounds
- func Connect2Switches(switches []*Switch, i, j int) {
- switchI := switches[i]
- switchJ := switches[j]
- c1, c2 := net.Pipe()
- doneCh := make(chan struct{})
- go func() {
- err := switchI.addPeerWithConnection(c1)
- if PanicOnAddPeerErr && err != nil {
- panic(err)
- }
- doneCh <- struct{}{}
- }()
- go func() {
- err := switchJ.addPeerWithConnection(c2)
- if PanicOnAddPeerErr && err != nil {
- panic(err)
- }
- doneCh <- struct{}{}
- }()
- <-doneCh
- <-doneCh
- }
-
- // StartSwitches calls sw.Start() for each given switch.
- // It returns the first encountered error.
- func StartSwitches(switches []*Switch) error {
- for _, s := range switches {
- _, err := s.Start() // start switch and reactors
- if err != nil {
- return err
- }
- }
- return nil
- }
-
- func makeSwitch(cfg *cfg.P2PConfig, i int, network, version string, initSwitch func(int, *Switch) *Switch) *Switch {
- privKey := crypto.GenPrivKeyEd25519()
- // new switch, add reactors
- // TODO: let the config be passed in?
- s := initSwitch(i, NewSwitch(cfg))
- s.SetNodeInfo(&NodeInfo{
- PubKey: privKey.PubKey().Unwrap().(crypto.PubKeyEd25519),
- Moniker: cmn.Fmt("switch%d", i),
- Network: network,
- Version: version,
- RemoteAddr: cmn.Fmt("%v:%v", network, rand.Intn(64512)+1023),
- ListenAddr: cmn.Fmt("%v:%v", network, rand.Intn(64512)+1023),
- })
- s.SetNodePrivKey(privKey)
- return s
- }
-
- func (sw *Switch) addPeerWithConnection(conn net.Conn) error {
- peer, err := newInboundPeer(conn, sw.reactorsByCh, sw.chDescs, sw.StopPeerForError, sw.nodePrivKey, sw.peerConfig)
- if err != nil {
- conn.Close()
- return err
- }
- peer.SetLogger(sw.Logger.With("peer", conn.RemoteAddr()))
- if err = sw.addPeer(peer); err != nil {
- conn.Close()
- return err
- }
-
- return nil
- }
-
- func (sw *Switch) addPeerWithConnectionAndConfig(conn net.Conn, config *PeerConfig) error {
- peer, err := newInboundPeer(conn, sw.reactorsByCh, sw.chDescs, sw.StopPeerForError, sw.nodePrivKey, config)
- if err != nil {
- conn.Close()
- return err
- }
- peer.SetLogger(sw.Logger.With("peer", conn.RemoteAddr()))
- if err = sw.addPeer(peer); err != nil {
- conn.Close()
- return err
- }
-
- return nil
- }
|