diff --git a/blockchain/reactor_test.go b/blockchain/reactor_test.go index 0ef38a6c4..b63a057e1 100644 --- a/blockchain/reactor_test.go +++ b/blockchain/reactor_test.go @@ -42,7 +42,7 @@ func newBlockchainReactor(logger log.Logger, maxBlockHeight int64) *BlockchainRe bcReactor.SetLogger(logger.With("module", "blockchain")) // Next: we need to set a switch in order for peers to be added in - bcReactor.Switch = p2p.NewSwitch(cfg.DefaultP2PConfig()) + bcReactor.Switch = p2p.NewSwitch(cfg.DefaultP2PConfig(), nil) // Lastly: let's add some blocks in for blockHeight := int64(1); blockHeight <= maxBlockHeight; blockHeight++ { diff --git a/consensus/byzantine_test.go b/consensus/byzantine_test.go index 0aba77432..3903e6b9e 100644 --- a/consensus/byzantine_test.go +++ b/consensus/byzantine_test.go @@ -39,7 +39,13 @@ func TestByzantine(t *testing.T) { switches := make([]*p2p.Switch, N) p2pLogger := logger.With("module", "p2p") for i := 0; i < N; i++ { - switches[i] = p2p.NewSwitch(config.P2P) + switches[i] = p2p.MakeSwitch( + config.P2P, + i, + "foo", "1.0.0", + func(i int, sw *p2p.Switch) *p2p.Switch { + return sw + }) switches[i].SetLogger(p2pLogger.With("validator", i)) } diff --git a/node/node.go b/node/node.go index c623e620f..995d1c889 100644 --- a/node/node.go +++ b/node/node.go @@ -126,9 +126,12 @@ type Node struct { privValidator types.PrivValidator // local node's validator key // network - sw *p2p.Switch // p2p connections - addrBook pex.AddrBook // known peers - nodeKey *p2p.NodeKey // our node privkey + transport *p2p.MultiplexTransport + sw *p2p.Switch // p2p connections + addrBook pex.AddrBook // known peers + nodeInfo p2p.NodeInfo + nodeKey *p2p.NodeKey // our node privkey + isListening bool // services eventBus *types.EventBus // pub/sub for services @@ -301,14 +304,109 @@ func NewNode(config *cfg.Config, consensusReactor := cs.NewConsensusReactor(consensusState, fastSync) consensusReactor.SetLogger(consensusLogger) - p2pLogger := logger.With("module", "p2p") + eventBus := types.NewEventBus() + eventBus.SetLogger(logger.With("module", "events")) + + // services which will be publishing and/or subscribing for messages (events) + // consensusReactor will set it on consensusState and blockExecutor + consensusReactor.SetEventBus(eventBus) - sw := p2p.NewSwitch(config.P2P, p2p.WithMetrics(p2pMetrics)) + // Transaction indexing + var txIndexer txindex.TxIndexer + switch config.TxIndex.Indexer { + case "kv": + store, err := dbProvider(&DBContext{"tx_index", config}) + if err != nil { + return nil, err + } + if config.TxIndex.IndexTags != "" { + txIndexer = kv.NewTxIndex(store, kv.IndexTags(splitAndTrimEmpty(config.TxIndex.IndexTags, ",", " "))) + } else if config.TxIndex.IndexAllTags { + txIndexer = kv.NewTxIndex(store, kv.IndexAllTags()) + } else { + txIndexer = kv.NewTxIndex(store) + } + default: + txIndexer = &null.TxIndex{} + } + + indexerService := txindex.NewIndexerService(txIndexer, eventBus) + indexerService.SetLogger(logger.With("module", "txindex")) + + var ( + p2pLogger = logger.With("module", "p2p") + nodeInfo = makeNodeInfo(config, nodeKey.ID(), txIndexer, genDoc.ChainID) + ) + + // Setup Transport. + var ( + transport = p2p.NewMultiplexTransport(nodeInfo, *nodeKey) + connFilters = []p2p.ConnFilterFunc{} + peerFilters = []p2p.PeerFilterFunc{} + ) + + if !config.P2P.AllowDuplicateIP { + connFilters = append(connFilters, p2p.ConnDuplicateIPFilter()) + } + + // Filter peers by addr or pubkey with an ABCI query. + // If the query return code is OK, add peer. + // XXX: Query format subject to change + if config.FilterPeers { + connFilters = append( + connFilters, + // ABCI query for address filtering. + func(_ p2p.ConnSet, c net.Conn, _ []net.IP) error { + res, err := proxyApp.Query().QuerySync(abci.RequestQuery{ + Path: fmt.Sprintf("/p2p/filter/addr/%s", c.RemoteAddr().String()), + }) + if err != nil { + return err + } + if res.IsErr() { + return fmt.Errorf("Error querying abci app: %v", res) + } + + return nil + }, + ) + + peerFilters = append( + peerFilters, + // ABCI query for ID filtering. + func(_ p2p.IPeerSet, p p2p.Peer) error { + res, err := proxyApp.Query().QuerySync(abci.RequestQuery{ + Path: fmt.Sprintf("/p2p/filter/id/%s", p.ID()), + }) + if err != nil { + return err + } + if res.IsErr() { + return fmt.Errorf("Error querying abci app: %v", res) + } + + return nil + }, + ) + } + + p2p.MultiplexTransportConnFilters(connFilters...)(transport) + + // Setup Switch. + sw := p2p.NewSwitch( + config.P2P, + transport, + p2p.WithMetrics(p2pMetrics), + p2p.SwitchPeerFilters(peerFilters...), + ) sw.SetLogger(p2pLogger) sw.AddReactor("MEMPOOL", mempoolReactor) sw.AddReactor("BLOCKCHAIN", bcReactor) sw.AddReactor("CONSENSUS", consensusReactor) sw.AddReactor("EVIDENCE", evidenceReactor) + sw.SetNodeInfo(nodeInfo) + sw.SetNodeKey(nodeKey) + p2pLogger.Info("P2P Node ID", "ID", nodeKey.ID(), "file", config.NodeKeyFile()) // Optionally, start the pex reactor @@ -324,6 +422,10 @@ func NewNode(config *cfg.Config, // If PEX is on, it should handle dialing the seeds. Otherwise the switch does it. // Note we currently use the addrBook regardless at least for AddOurAddress addrBook := pex.NewAddrBook(config.P2P.AddrBookFile(), config.P2P.AddrBookStrict) + + // Add ourselves to addrbook to prevent dialing ourselves + addrBook.AddOurAddress(nodeInfo.NetAddress()) + addrBook.SetLogger(p2pLogger.With("book", config.P2P.AddrBookFile())) if config.P2P.PexReactor { // TODO persistent peers ? so we can have their DNS addrs saved @@ -338,62 +440,6 @@ func NewNode(config *cfg.Config, sw.SetAddrBook(addrBook) - // Filter peers by addr or pubkey with an ABCI query. - // If the query return code is OK, add peer. - // XXX: Query format subject to change - if config.FilterPeers { - // NOTE: addr is ip:port - sw.SetAddrFilter(func(addr net.Addr) error { - resQuery, err := proxyApp.Query().QuerySync(abci.RequestQuery{Path: fmt.Sprintf("/p2p/filter/addr/%s", addr.String())}) - if err != nil { - return err - } - if resQuery.IsErr() { - return fmt.Errorf("Error querying abci app: %v", resQuery) - } - return nil - }) - sw.SetIDFilter(func(id p2p.ID) error { - resQuery, err := proxyApp.Query().QuerySync(abci.RequestQuery{Path: fmt.Sprintf("/p2p/filter/id/%s", id)}) - if err != nil { - return err - } - if resQuery.IsErr() { - return fmt.Errorf("Error querying abci app: %v", resQuery) - } - return nil - }) - } - - eventBus := types.NewEventBus() - eventBus.SetLogger(logger.With("module", "events")) - - // services which will be publishing and/or subscribing for messages (events) - // consensusReactor will set it on consensusState and blockExecutor - consensusReactor.SetEventBus(eventBus) - - // Transaction indexing - var txIndexer txindex.TxIndexer - switch config.TxIndex.Indexer { - case "kv": - store, err := dbProvider(&DBContext{"tx_index", config}) - if err != nil { - return nil, err - } - if config.TxIndex.IndexTags != "" { - txIndexer = kv.NewTxIndex(store, kv.IndexTags(splitAndTrimEmpty(config.TxIndex.IndexTags, ",", " "))) - } else if config.TxIndex.IndexAllTags { - txIndexer = kv.NewTxIndex(store, kv.IndexAllTags()) - } else { - txIndexer = kv.NewTxIndex(store) - } - default: - txIndexer = &null.TxIndex{} - } - - indexerService := txindex.NewIndexerService(txIndexer, eventBus) - indexerService.SetLogger(logger.With("module", "txindex")) - // run the profile server profileHost := config.ProfListenAddress if profileHost != "" { @@ -407,9 +453,11 @@ func NewNode(config *cfg.Config, genesisDoc: genDoc, privValidator: privValidator, - sw: sw, - addrBook: addrBook, - nodeKey: nodeKey, + transport: transport, + sw: sw, + addrBook: addrBook, + nodeInfo: nodeInfo, + nodeKey: nodeKey, stateDB: stateDB, blockStore: blockStore, @@ -441,21 +489,6 @@ func (n *Node) OnStart() error { return err } - // Create & add listener - l := p2p.NewDefaultListener( - n.config.P2P.ListenAddress, - n.config.P2P.ExternalAddress, - n.config.P2P.UPNP, - n.Logger.With("module", "p2p")) - n.sw.AddListener(l) - - nodeInfo := n.makeNodeInfo(n.nodeKey.ID()) - n.sw.SetNodeInfo(nodeInfo) - n.sw.SetNodeKey(n.nodeKey) - - // Add ourselves to addrbook to prevent dialing ourselves - n.addrBook.AddOurAddress(nodeInfo.NetAddress()) - // Add private IDs to addrbook to block those peers being added n.addrBook.AddPrivateIDs(splitAndTrimEmpty(n.config.P2P.PrivatePeerIDs, ",", " ")) @@ -474,6 +507,17 @@ func (n *Node) OnStart() error { n.prometheusSrv = n.startPrometheusServer(n.config.Instrumentation.PrometheusListenAddr) } + // Start the transport. + addr, err := p2p.NewNetAddressStringWithOptionalID(n.config.P2P.ListenAddress) + if err != nil { + return err + } + if err := n.transport.Listen(*addr); err != nil { + return err + } + + n.isListening = true + // Start the switch (the P2P server). err = n.sw.Start() if err != nil { @@ -506,6 +550,12 @@ func (n *Node) OnStop() { // TODO: gracefully disconnect from peers. n.sw.Stop() + if err := n.transport.Close(); err != nil { + n.Logger.Error("Error closing transport", "err", err) + } + + n.isListening = false + // finally stop the listeners / external services for _, l := range n.rpcListeners { n.Logger.Info("Closing rpc listener", "listener", l) @@ -536,13 +586,6 @@ func (n *Node) RunForever() { }) } -// AddListener adds a listener to accept inbound peer connections. -// It should be called before starting the Node. -// The first listener is the primary listener (in NodeInfo) -func (n *Node) AddListener(l p2p.Listener) { - n.sw.AddListener(l) -} - // ConfigureRPC sets all variables in rpccore so they will serve // rpc calls from this node func (n *Node) ConfigureRPC() { @@ -551,7 +594,8 @@ func (n *Node) ConfigureRPC() { rpccore.SetConsensusState(n.consensusState) rpccore.SetMempool(n.mempoolReactor.Mempool) rpccore.SetEvidencePool(n.evidencePool) - rpccore.SetSwitch(n.sw) + rpccore.SetP2PPeers(n.sw) + rpccore.SetP2PTransport(n) rpccore.SetPubKey(n.privValidator.GetPubKey()) rpccore.SetGenesisDoc(n.genesisDoc) rpccore.SetAddrBook(n.addrBook) @@ -683,14 +727,36 @@ func (n *Node) ProxyApp() proxy.AppConns { return n.proxyApp } -func (n *Node) makeNodeInfo(nodeID p2p.ID) p2p.NodeInfo { +//------------------------------------------------------------------------------ + +func (n *Node) Listeners() []string { + return []string{ + fmt.Sprintf("Listener(@%v)", n.config.P2P.ExternalAddress), + } +} + +func (n *Node) IsListening() bool { + return n.isListening +} + +// NodeInfo returns the Node's Info from the Switch. +func (n *Node) NodeInfo() p2p.NodeInfo { + return n.nodeInfo +} + +func makeNodeInfo( + config *cfg.Config, + nodeID p2p.ID, + txIndexer txindex.TxIndexer, + chainID string, +) p2p.NodeInfo { txIndexerStatus := "on" - if _, ok := n.txIndexer.(*null.TxIndex); ok { + if _, ok := txIndexer.(*null.TxIndex); ok { txIndexerStatus = "off" } nodeInfo := p2p.NodeInfo{ ID: nodeID, - Network: n.genesisDoc.ChainID, + Network: chainID, Version: version.Version, Channels: []byte{ bc.BlockchainChannel, @@ -698,7 +764,7 @@ func (n *Node) makeNodeInfo(nodeID p2p.ID) p2p.NodeInfo { mempl.MempoolChannel, evidence.EvidenceChannel, }, - Moniker: n.config.Moniker, + Moniker: config.Moniker, Other: p2p.NodeInfoOther{ AminoVersion: amino.Version, P2PVersion: p2p.Version, @@ -708,34 +774,26 @@ func (n *Node) makeNodeInfo(nodeID p2p.ID) p2p.NodeInfo { }, } - if n.config.P2P.PexReactor { + if config.P2P.PexReactor { nodeInfo.Channels = append(nodeInfo.Channels, pex.PexChannel) } - rpcListenAddr := n.config.RPC.ListenAddress + rpcListenAddr := config.RPC.ListenAddress nodeInfo.Other.RPCAddress = rpcListenAddr - if !n.sw.IsListening() { - return nodeInfo + lAddr := config.P2P.ExternalAddress + + if lAddr == "" { + lAddr = config.P2P.ListenAddress } - p2pListener := n.sw.Listeners()[0] - p2pHost := p2pListener.ExternalAddressHost() - p2pPort := p2pListener.ExternalAddress().Port - nodeInfo.ListenAddr = fmt.Sprintf("%v:%v", p2pHost, p2pPort) + nodeInfo.ListenAddr = lAddr return nodeInfo } //------------------------------------------------------------------------------ -// NodeInfo returns the Node's Info from the Switch. -func (n *Node) NodeInfo() p2p.NodeInfo { - return n.sw.NodeInfo() -} - -//------------------------------------------------------------------------------ - var ( genesisDocKey = []byte("genesisDoc") ) diff --git a/p2p/errors.go b/p2p/errors.go index fc477d1c2..902d22034 100644 --- a/p2p/errors.go +++ b/p2p/errors.go @@ -5,6 +5,98 @@ import ( "net" ) +// ErrFilterTimeout indicates that a filter operation timed out. +type ErrFilterTimeout struct{} + +func (e ErrFilterTimeout) Error() string { + return "filter timed out" +} + +// ErrRejected indicates that a Peer was rejected carrying additional +// information as to the reason. +type ErrRejected struct { + addr NetAddress + conn net.Conn + err error + id ID + isAuthFailure bool + isDuplicate bool + isFiltered bool + isIncompatible bool + isNodeInfoInvalid bool + isSelf bool +} + +// Addr returns the NetAddress for the rejected Peer. +func (e ErrRejected) Addr() NetAddress { + return e.addr +} + +func (e ErrRejected) Error() string { + if e.isAuthFailure { + return fmt.Sprintf("auth failure: %s", e.err) + } + + if e.isDuplicate { + if e.conn != nil { + return fmt.Sprintf( + "duplicate CONN<%s>: %s", + e.conn.RemoteAddr().String(), + e.err, + ) + } + if e.id != "" { + return fmt.Sprintf("duplicate ID<%v>: %s", e.id, e.err) + } + } + + if e.isFiltered { + if e.conn != nil { + return fmt.Sprintf( + "filtered CONN<%s>: %s", + e.conn.RemoteAddr().String(), + e.err, + ) + } + + if e.id != "" { + return fmt.Sprintf("filtered ID<%v>: %s", e.id, e.err) + } + } + + if e.isIncompatible { + return fmt.Sprintf("incompatible: %s", e.err) + } + + if e.isNodeInfoInvalid { + return fmt.Sprintf("invalid NodeInfo: %s", e.err) + } + + if e.isSelf { + return fmt.Sprintf("self ID<%v>", e.id) + } + + return fmt.Sprintf("%s", e.err) +} + +// IsAuthFailure when Peer authentication was unsuccessful. +func (e ErrRejected) IsAuthFailure() bool { return e.isAuthFailure } + +// IsDuplicate when Peer ID or IP are present already. +func (e ErrRejected) IsDuplicate() bool { return e.isDuplicate } + +// IsFiltered when Peer ID or IP was filtered. +func (e ErrRejected) IsFiltered() bool { return e.isFiltered } + +// IsIncompatible when Peer NodeInfo is not compatible with our own. +func (e ErrRejected) IsIncompatible() bool { return e.isIncompatible } + +// IsNodeInfoInvalid when the sent NodeInfo is not valid. +func (e ErrRejected) IsNodeInfoInvalid() bool { return e.isNodeInfoInvalid } + +// IsSelf when Peer is our own node. +func (e ErrRejected) IsSelf() bool { return e.isSelf } + // ErrSwitchDuplicatePeerID to be raised when a peer is connecting with a known // ID. type ErrSwitchDuplicatePeerID struct { @@ -47,6 +139,13 @@ func (e ErrSwitchAuthenticationFailure) Error() string { ) } +// ErrTransportClosed is raised when the Transport has been closed. +type ErrTransportClosed struct{} + +func (e ErrTransportClosed) Error() string { + return "transport has been closed" +} + //------------------------------------------------------------------- type ErrNetAddressNoID struct { diff --git a/p2p/listener.go b/p2p/listener.go deleted file mode 100644 index d0dd3f42a..000000000 --- a/p2p/listener.go +++ /dev/null @@ -1,286 +0,0 @@ -package p2p - -import ( - "fmt" - "net" - "strconv" - "strings" - "time" - - cmn "github.com/tendermint/tendermint/libs/common" - "github.com/tendermint/tendermint/libs/log" - "github.com/tendermint/tendermint/p2p/upnp" -) - -// Listener is a network listener for stream-oriented protocols, providing -// convenient methods to get listener's internal and external addresses. -// Clients are supposed to read incoming connections from a channel, returned -// by Connections() method. -type Listener interface { - Connections() <-chan net.Conn - InternalAddress() *NetAddress - ExternalAddress() *NetAddress - ExternalAddressHost() string - String() string - Stop() error -} - -// DefaultListener is a cmn.Service, running net.Listener underneath. -// Optionally, UPnP is used upon calling NewDefaultListener to resolve external -// address. -type DefaultListener struct { - cmn.BaseService - - listener net.Listener - intAddr *NetAddress - extAddr *NetAddress - connections chan net.Conn -} - -var _ Listener = (*DefaultListener)(nil) - -const ( - numBufferedConnections = 10 - defaultExternalPort = 8770 - tryListenSeconds = 5 -) - -func splitHostPort(addr string) (host string, port int) { - host, portStr, err := net.SplitHostPort(addr) - if err != nil { - panic(err) - } - port, err = strconv.Atoi(portStr) - if err != nil { - panic(err) - } - return host, port -} - -// NewDefaultListener creates a new DefaultListener on lAddr, optionally trying -// to determine external address using UPnP. -func NewDefaultListener( - fullListenAddrString string, - externalAddrString string, - useUPnP bool, - logger log.Logger) Listener { - - // Split protocol, address, and port. - protocol, lAddr := cmn.ProtocolAndAddress(fullListenAddrString) - lAddrIP, lAddrPort := splitHostPort(lAddr) - - // Create listener - var listener net.Listener - var err error - for i := 0; i < tryListenSeconds; i++ { - listener, err = net.Listen(protocol, lAddr) - if err == nil { - break - } else if i < tryListenSeconds-1 { - time.Sleep(time.Second * 1) - } - } - if err != nil { - panic(err) - } - // Actual listener local IP & port - listenerIP, listenerPort := splitHostPort(listener.Addr().String()) - logger.Info("Local listener", "ip", listenerIP, "port", listenerPort) - - // Determine internal address... - var intAddr *NetAddress - intAddr, err = NewNetAddressStringWithOptionalID(lAddr) - if err != nil { - panic(err) - } - - inAddrAny := lAddrIP == "" || lAddrIP == "0.0.0.0" - - // Determine external address. - var extAddr *NetAddress - - if externalAddrString != "" { - var err error - extAddr, err = NewNetAddressStringWithOptionalID(externalAddrString) - if err != nil { - panic(fmt.Sprintf("Error in ExternalAddress: %v", err)) - } - } - - // If the lAddrIP is INADDR_ANY, try UPnP. - if extAddr == nil && useUPnP && inAddrAny { - extAddr = getUPNPExternalAddress(lAddrPort, listenerPort, logger) - } - - // Otherwise just use the local address. - if extAddr == nil { - defaultToIPv4 := inAddrAny - extAddr = getNaiveExternalAddress(defaultToIPv4, listenerPort, false, logger) - } - if extAddr == nil { - panic("Could not determine external address!") - } - - dl := &DefaultListener{ - listener: listener, - intAddr: intAddr, - extAddr: extAddr, - connections: make(chan net.Conn, numBufferedConnections), - } - dl.BaseService = *cmn.NewBaseService(logger, "DefaultListener", dl) - err = dl.Start() // Started upon construction - if err != nil { - logger.Error("Error starting base service", "err", err) - } - return dl -} - -// OnStart implements cmn.Service by spinning a goroutine, listening for new -// connections. -func (l *DefaultListener) OnStart() error { - if err := l.BaseService.OnStart(); err != nil { - return err - } - go l.listenRoutine() - return nil -} - -// OnStop implements cmn.Service by closing the listener. -func (l *DefaultListener) OnStop() { - l.BaseService.OnStop() - l.listener.Close() // nolint: errcheck -} - -// Accept connections and pass on the channel. -func (l *DefaultListener) listenRoutine() { - for { - conn, err := l.listener.Accept() - - if !l.IsRunning() { - break // Go to cleanup - } - - // listener wasn't stopped, - // yet we encountered an error. - if err != nil { - panic(err) - } - - l.connections <- conn - } - - // Cleanup - close(l.connections) - for range l.connections { - // Drain - } -} - -// Connections returns a channel of inbound connections. -// It gets closed when the listener closes. -// It is the callers responsibility to close any connections received -// over this channel. -func (l *DefaultListener) Connections() <-chan net.Conn { - return l.connections -} - -// InternalAddress returns the internal NetAddress (address used for -// listening). -func (l *DefaultListener) InternalAddress() *NetAddress { - return l.intAddr -} - -// ExternalAddress returns the external NetAddress (publicly available, -// determined using either UPnP or local resolver). -func (l *DefaultListener) ExternalAddress() *NetAddress { - return l.extAddr -} - -// ExternalAddressHost returns the external NetAddress IP string. If an IP is -// IPv6, it's wrapped in brackets ("[2001:db8:1f70::999:de8:7648:6e8]"). -func (l *DefaultListener) ExternalAddressHost() string { - ip := l.ExternalAddress().IP - if isIpv6(ip) { - // Means it's ipv6, so format it with brackets - return "[" + ip.String() + "]" - } - return ip.String() -} - -func (l *DefaultListener) String() string { - return fmt.Sprintf("Listener(@%v)", l.extAddr) -} - -/* external address helpers */ - -// UPNP external address discovery & port mapping -func getUPNPExternalAddress(externalPort, internalPort int, logger log.Logger) *NetAddress { - logger.Info("Getting UPNP external address") - nat, err := upnp.Discover() - if err != nil { - logger.Info("Could not perform UPNP discover", "err", err) - return nil - } - - ext, err := nat.GetExternalAddress() - if err != nil { - logger.Info("Could not get UPNP external address", "err", err) - return nil - } - - // UPnP can't seem to get the external port, so let's just be explicit. - if externalPort == 0 { - externalPort = defaultExternalPort - } - - externalPort, err = nat.AddPortMapping("tcp", externalPort, internalPort, "tendermint", 0) - if err != nil { - logger.Info("Could not add UPNP port mapping", "err", err) - return nil - } - - logger.Info("Got UPNP external address", "address", ext) - return NewNetAddressIPPort(ext, uint16(externalPort)) -} - -func isIpv6(ip net.IP) bool { - v4 := ip.To4() - if v4 != nil { - return false - } - - ipString := ip.String() - - // Extra check just to be sure it's IPv6 - return (strings.Contains(ipString, ":") && !strings.Contains(ipString, ".")) -} - -// TODO: use syscalls: see issue #712 -func getNaiveExternalAddress(defaultToIPv4 bool, port int, settleForLocal bool, logger log.Logger) *NetAddress { - addrs, err := net.InterfaceAddrs() - if err != nil { - panic(fmt.Sprintf("Could not fetch interface addresses: %v", err)) - } - - for _, a := range addrs { - ipnet, ok := a.(*net.IPNet) - if !ok { - continue - } - if defaultToIPv4 || !isIpv6(ipnet.IP) { - v4 := ipnet.IP.To4() - if v4 == nil || (!settleForLocal && v4[0] == 127) { - // loopback - continue - } - } else if !settleForLocal && ipnet.IP.IsLoopback() { - // IPv6, check for loopback - continue - } - return NewNetAddressIPPort(ipnet.IP, uint16(port)) - } - - // try again, but settle for local - logger.Info("Node may not be connected to internet. Settling for local address") - return getNaiveExternalAddress(defaultToIPv4, port, true, logger) -} diff --git a/p2p/listener_test.go b/p2p/listener_test.go deleted file mode 100644 index f87b5d6f5..000000000 --- a/p2p/listener_test.go +++ /dev/null @@ -1,79 +0,0 @@ -package p2p - -import ( - "bytes" - "net" - "strings" - "testing" - - "github.com/stretchr/testify/require" - "github.com/tendermint/tendermint/libs/log" -) - -func TestListener(t *testing.T) { - // Create a listener - l := NewDefaultListener("tcp://:8001", "", false, log.TestingLogger()) - - // Dial the listener - lAddr := l.ExternalAddress() - connOut, err := lAddr.Dial() - if err != nil { - t.Fatalf("Could not connect to listener address %v", lAddr) - } else { - t.Logf("Created a connection to listener address %v", lAddr) - } - connIn, ok := <-l.Connections() - if !ok { - t.Fatalf("Could not get inbound connection from listener") - } - - msg := []byte("hi!") - go func() { - _, err := connIn.Write(msg) - if err != nil { - t.Error(err) - } - }() - b := make([]byte, 32) - n, err := connOut.Read(b) - if err != nil { - t.Fatalf("Error reading off connection: %v", err) - } - - b = b[:n] - if !bytes.Equal(msg, b) { - t.Fatalf("Got %s, expected %s", b, msg) - } - - // Close the server, no longer needed. - l.Stop() -} - -func TestExternalAddress(t *testing.T) { - { - // Create a listener with no external addr. Should default - // to local ipv4. - l := NewDefaultListener("tcp://:8001", "", false, log.TestingLogger()) - lAddr := l.ExternalAddress().String() - _, _, err := net.SplitHostPort(lAddr) - require.Nil(t, err) - spl := strings.Split(lAddr, ".") - require.Equal(t, len(spl), 4) - l.Stop() - } - - { - // Create a listener with set external ipv4 addr. - setExAddr := "8.8.8.8:8080" - l := NewDefaultListener("tcp://:8001", setExAddr, false, log.TestingLogger()) - lAddr := l.ExternalAddress().String() - require.Equal(t, lAddr, setExAddr) - l.Stop() - } - - { - // Invalid external addr causes panic - setExAddr := "awrlsckjnal:8080" - require.Panics(t, func() { NewDefaultListener("tcp://:8001", setExAddr, false, log.TestingLogger()) }) - } -} diff --git a/p2p/peer.go b/p2p/peer.go index a5f0bbbd8..5dbc582c0 100644 --- a/p2p/peer.go +++ b/p2p/peer.go @@ -6,7 +6,6 @@ import ( "sync/atomic" "time" - crypto "github.com/tendermint/tendermint/crypto" cmn "github.com/tendermint/tendermint/libs/common" "github.com/tendermint/tendermint/libs/log" @@ -130,87 +129,6 @@ func newPeer( return p } -func newOutboundPeerConn( - addr *NetAddress, - config *config.P2PConfig, - persistent bool, - ourNodePrivKey crypto.PrivKey, -) (peerConn, error) { - conn, err := dial(addr, config) - if err != nil { - return peerConn{}, cmn.ErrorWrap(err, "Error creating peer") - } - - pc, err := newPeerConn(conn, config, true, persistent, ourNodePrivKey, addr) - if err != nil { - if cerr := conn.Close(); cerr != nil { - return peerConn{}, cmn.ErrorWrap(err, cerr.Error()) - } - return peerConn{}, err - } - - // ensure dialed ID matches connection ID - if addr.ID != pc.ID() { - if cerr := conn.Close(); cerr != nil { - return peerConn{}, cmn.ErrorWrap(err, cerr.Error()) - } - return peerConn{}, ErrSwitchAuthenticationFailure{addr, pc.ID()} - } - - return pc, nil -} - -func newInboundPeerConn( - conn net.Conn, - config *config.P2PConfig, - ourNodePrivKey crypto.PrivKey, -) (peerConn, error) { - - // TODO: issue PoW challenge - - return newPeerConn(conn, config, false, false, ourNodePrivKey, nil) -} - -func newPeerConn( - rawConn net.Conn, - cfg *config.P2PConfig, - outbound, persistent bool, - ourNodePrivKey crypto.PrivKey, - originalAddr *NetAddress, -) (pc peerConn, err error) { - conn := rawConn - - // Fuzz connection - if cfg.TestFuzz { - // so we have time to do peer handshakes and get set up - conn = FuzzConnAfterFromConfig(conn, 10*time.Second, cfg.TestFuzzConfig) - } - - // Set deadline for secret handshake - dl := time.Now().Add(cfg.HandshakeTimeout) - if err := conn.SetDeadline(dl); err != nil { - return pc, cmn.ErrorWrap( - err, - "Error setting deadline while encrypting connection", - ) - } - - // Encrypt connection - conn, err = tmconn.MakeSecretConnection(conn, ourNodePrivKey) - if err != nil { - return pc, cmn.ErrorWrap(err, "Error creating peer") - } - - // Only the information we already have - return peerConn{ - config: cfg, - outbound: outbound, - persistent: persistent, - conn: conn, - originalAddr: originalAddr, - }, nil -} - //--------------------------------------------------- // Implements cmn.Service @@ -399,18 +317,6 @@ func (p *peer) String() string { //------------------------------------------------------------------ // helper funcs -func dial(addr *NetAddress, cfg *config.P2PConfig) (net.Conn, error) { - if cfg.TestDialFail { - return nil, fmt.Errorf("dial err (peerConfig.DialFail == true)") - } - - conn, err := addr.DialTimeout(cfg.DialTimeout) - if err != nil { - return nil, err - } - return conn, nil -} - func createMConnection( conn net.Conn, p *peer, diff --git a/p2p/peer_test.go b/p2p/peer_test.go index f0e915328..a2a2946a1 100644 --- a/p2p/peer_test.go +++ b/p2p/peer_test.go @@ -1,6 +1,7 @@ package p2p import ( + "fmt" golog "log" "net" "testing" @@ -76,7 +77,7 @@ func createOutboundPeerAndPerformHandshake( } reactorsByCh := map[byte]Reactor{testCh: NewTestReactor(chDescs, true)} pk := ed25519.GenPrivKey() - pc, err := newOutboundPeerConn(addr, config, false, pk) + pc, err := testOutboundPeerConn(addr, config, false, pk) if err != nil { return nil, err } @@ -96,6 +97,48 @@ func createOutboundPeerAndPerformHandshake( return p, nil } +func testDial(addr *NetAddress, cfg *config.P2PConfig) (net.Conn, error) { + if cfg.TestDialFail { + return nil, fmt.Errorf("dial err (peerConfig.DialFail == true)") + } + + conn, err := addr.DialTimeout(cfg.DialTimeout) + if err != nil { + return nil, err + } + return conn, nil +} + +func testOutboundPeerConn( + addr *NetAddress, + config *config.P2PConfig, + persistent bool, + ourNodePrivKey crypto.PrivKey, +) (peerConn, error) { + conn, err := testDial(addr, config) + if err != nil { + return peerConn{}, cmn.ErrorWrap(err, "Error creating peer") + } + + pc, err := testPeerConn(conn, config, true, persistent, ourNodePrivKey, addr) + if err != nil { + if cerr := conn.Close(); cerr != nil { + return peerConn{}, cmn.ErrorWrap(err, cerr.Error()) + } + return peerConn{}, err + } + + // ensure dialed ID matches connection ID + if addr.ID != pc.ID() { + if cerr := conn.Close(); cerr != nil { + return peerConn{}, cmn.ErrorWrap(err, cerr.Error()) + } + return peerConn{}, ErrSwitchAuthenticationFailure{addr, pc.ID()} + } + + return pc, nil +} + type remotePeer struct { PrivKey crypto.PrivKey Config *config.P2PConfig @@ -143,19 +186,19 @@ func (rp *remotePeer) accept(l net.Listener) { golog.Fatalf("Failed to accept conn: %+v", err) } - pc, err := newInboundPeerConn(conn, rp.Config, rp.PrivKey) + pc, err := testInboundPeerConn(conn, rp.Config, rp.PrivKey) if err != nil { golog.Fatalf("Failed to create a peer: %+v", err) } - _, err = pc.HandshakeTimeout(NodeInfo{ + _, err = handshake(pc.conn, time.Second, NodeInfo{ ID: rp.Addr().ID, Moniker: "remote_peer", Network: "testing", Version: "123.123.123", ListenAddr: l.Addr().String(), Channels: rp.channels, - }, 1*time.Second) + }) if err != nil { golog.Fatalf("Failed to perform handshake: %+v", err) } diff --git a/p2p/pex/pex_reactor_test.go b/p2p/pex/pex_reactor_test.go index f72f81e06..c22eabdc1 100644 --- a/p2p/pex/pex_reactor_test.go +++ b/p2p/pex/pex_reactor_test.go @@ -109,9 +109,7 @@ func TestPEXReactorRunning(t *testing.T) { addOtherNodeAddrToAddrBook(1, 0) addOtherNodeAddrToAddrBook(2, 1) - for i, sw := range switches { - sw.AddListener(p2p.NewDefaultListener("tcp://"+sw.NodeInfo().ListenAddr, "", false, logger.With("pex", i))) - + for _, sw := range switches { err := sw.Start() // start switch and reactors require.Nil(t, err) } @@ -474,9 +472,6 @@ func testCreatePeerWithConfig(dir string, id int, config *PEXReactorConfig) *p2p return sw }, ) - peer.AddListener( - p2p.NewDefaultListener("tcp://"+peer.NodeInfo().ListenAddr, "", false, log.TestingLogger()), - ) return peer } @@ -510,9 +505,6 @@ func testCreateSeed(dir string, id int, knownAddrs, srcAddrs []*p2p.NetAddress) return sw }, ) - seed.AddListener( - p2p.NewDefaultListener("tcp://"+seed.NodeInfo().ListenAddr, "", false, log.TestingLogger()), - ) return seed } diff --git a/p2p/switch.go b/p2p/switch.go index b5413dabf..57077e07d 100644 --- a/p2p/switch.go +++ b/p2p/switch.go @@ -3,7 +3,6 @@ package p2p import ( "fmt" "math" - "net" "sync" "time" @@ -42,6 +41,10 @@ type AddrBook interface { Save() } +// PeerFilterFunc to be implemented by filter hooks after a new Peer has been +// fully setup. +type PeerFilterFunc func(IPeerSet, Peer) error + //----------------------------------------------------------------------------- // Switch handles peer connections and exposes an API to receive incoming messages @@ -52,7 +55,6 @@ type Switch struct { cmn.BaseService config *config.P2PConfig - listeners []Listener reactors map[string]Reactor chDescs []*conn.ChannelDescriptor reactorsByCh map[byte]Reactor @@ -63,8 +65,10 @@ type Switch struct { nodeKey *NodeKey // our node privkey addrBook AddrBook - filterConnByAddr func(net.Addr) error - filterConnByID func(ID) error + transport Transport + + filterTimeout time.Duration + peerFilters []PeerFilterFunc mConfig conn.MConnConfig @@ -77,16 +81,22 @@ type Switch struct { type SwitchOption func(*Switch) // NewSwitch creates a new Switch with the given config. -func NewSwitch(cfg *config.P2PConfig, options ...SwitchOption) *Switch { +func NewSwitch( + cfg *config.P2PConfig, + transport Transport, + options ...SwitchOption, +) *Switch { sw := &Switch{ - config: cfg, - reactors: make(map[string]Reactor), - chDescs: make([]*conn.ChannelDescriptor, 0), - reactorsByCh: make(map[byte]Reactor), - peers: NewPeerSet(), - dialing: cmn.NewCMap(), - reconnecting: cmn.NewCMap(), - metrics: NopMetrics(), + config: cfg, + reactors: make(map[string]Reactor), + chDescs: make([]*conn.ChannelDescriptor, 0), + reactorsByCh: make(map[byte]Reactor), + peers: NewPeerSet(), + dialing: cmn.NewCMap(), + reconnecting: cmn.NewCMap(), + metrics: NopMetrics(), + transport: transport, + filterTimeout: defaultFilterTimeout, } // Ensure we have a completely undeterministic PRNG. @@ -109,6 +119,16 @@ func NewSwitch(cfg *config.P2PConfig, options ...SwitchOption) *Switch { return sw } +// SwitchFilterTimeout sets the timeout used for peer filters. +func SwitchFilterTimeout(timeout time.Duration) SwitchOption { + return func(sw *Switch) { sw.filterTimeout = timeout } +} + +// SwitchPeerFilters sets the filters for rejection of new peers. +func SwitchPeerFilters(filters ...PeerFilterFunc) SwitchOption { + return func(sw *Switch) { sw.peerFilters = filters } +} + // WithMetrics sets the metrics. func WithMetrics(metrics *Metrics) SwitchOption { return func(sw *Switch) { sw.metrics = metrics } @@ -148,24 +168,6 @@ func (sw *Switch) Reactor(name string) Reactor { return sw.reactors[name] } -// AddListener adds the given listener to the switch for listening to incoming peer connections. -// NOTE: Not goroutine safe. -func (sw *Switch) AddListener(l Listener) { - sw.listeners = append(sw.listeners, l) -} - -// Listeners returns the list of listeners the switch listens on. -// NOTE: Not goroutine safe. -func (sw *Switch) Listeners() []Listener { - return sw.listeners -} - -// IsListening returns true if the switch has at least one listener. -// NOTE: Not goroutine safe. -func (sw *Switch) IsListening() bool { - return len(sw.listeners) > 0 -} - // SetNodeInfo sets the switch's NodeInfo for checking compatibility and handshaking with other nodes. // NOTE: Not goroutine safe. func (sw *Switch) SetNodeInfo(nodeInfo NodeInfo) { @@ -187,7 +189,7 @@ func (sw *Switch) SetNodeKey(nodeKey *NodeKey) { //--------------------------------------------------------------------- // Service start/stop -// OnStart implements BaseService. It starts all the reactors, peers, and listeners. +// OnStart implements BaseService. It starts all the reactors and peers. func (sw *Switch) OnStart() error { // Start reactors for _, reactor := range sw.reactors { @@ -196,25 +198,21 @@ func (sw *Switch) OnStart() error { return cmn.ErrorWrap(err, "failed to start %v", reactor) } } - // Start listeners - for _, listener := range sw.listeners { - go sw.listenerRoutine(listener) - } + + // Start accepting Peers. + go sw.acceptRoutine() + return nil } -// OnStop implements BaseService. It stops all listeners, peers, and reactors. +// OnStop implements BaseService. It stops all peers and reactors. func (sw *Switch) OnStop() { - // Stop listeners - for _, listener := range sw.listeners { - listener.Stop() - } - sw.listeners = nil // Stop peers - for _, peer := range sw.peers.List() { - peer.Stop() - sw.peers.Remove(peer) + for _, p := range sw.peers.List() { + p.Stop() + sw.peers.Remove(p) } + // Stop reactors sw.Logger.Debug("Switch: Stopping reactors") for _, reactor := range sw.reactors { @@ -459,42 +457,46 @@ func (sw *Switch) IsDialingOrExistingAddress(addr *NetAddress) bool { (!sw.config.AllowDuplicateIP && sw.peers.HasIP(addr.IP)) } -//------------------------------------------------------------------------------------ -// Connection filtering - -// FilterConnByAddr returns an error if connecting to the given address is forbidden. -func (sw *Switch) FilterConnByAddr(addr net.Addr) error { - if sw.filterConnByAddr != nil { - return sw.filterConnByAddr(addr) - } - return nil -} - -// FilterConnByID returns an error if connecting to the given peer ID is forbidden. -func (sw *Switch) FilterConnByID(id ID) error { - if sw.filterConnByID != nil { - return sw.filterConnByID(id) - } - return nil - -} - -// SetAddrFilter sets the function for filtering connections by address. -func (sw *Switch) SetAddrFilter(f func(net.Addr) error) { - sw.filterConnByAddr = f -} - -// SetIDFilter sets the function for filtering connections by peer ID. -func (sw *Switch) SetIDFilter(f func(ID) error) { - sw.filterConnByID = f -} +func (sw *Switch) acceptRoutine() { + for { + p, err := sw.transport.Accept(peerConfig{ + chDescs: sw.chDescs, + onPeerError: sw.StopPeerForError, + reactorsByCh: sw.reactorsByCh, + }) + if err != nil { + switch err.(type) { + case ErrRejected: + rErr := err.(ErrRejected) + + if rErr.IsSelf() { + // Remove the given address from the address book and add to our addresses + // to avoid dialing in the future. + addr := rErr.Addr() + sw.addrBook.RemoveAddress(&addr) + sw.addrBook.AddOurAddress(&addr) + } -//------------------------------------------------------------------------------------ + sw.Logger.Info( + "Inbound Peer rejected", + "err", err, + "numPeers", sw.peers.Size(), + ) + + continue + case *ErrTransportClosed: + sw.Logger.Error( + "Stopped accept routine, as transport is closed", + "numPeers", sw.peers.Size(), + ) + default: + sw.Logger.Error( + "Accept on transport errored", + "err", err, + "numPeers", sw.peers.Size(), + ) + } -func (sw *Switch) listenerRoutine(l Listener) { - for { - inConn, ok := <-l.Connections() - if !ok { break } @@ -503,41 +505,25 @@ func (sw *Switch) listenerRoutine(l Listener) { if in >= sw.config.MaxNumInboundPeers { sw.Logger.Info( "Ignoring inbound connection: already have enough inbound peers", - "address", inConn.RemoteAddr().String(), + "address", p.NodeInfo().NetAddress().String(), "have", in, "max", sw.config.MaxNumInboundPeers, ) - inConn.Close() - continue - } - // New inbound connection! - err := sw.addInboundPeerWithConfig(inConn, sw.config) - if err != nil { - sw.Logger.Info("Ignoring inbound connection: error while adding peer", "address", inConn.RemoteAddr().String(), "err", err) + _ = p.Stop() + continue } - } - - // cleanup -} -// closes conn if err is returned -func (sw *Switch) addInboundPeerWithConfig( - conn net.Conn, - config *config.P2PConfig, -) error { - peerConn, err := newInboundPeerConn(conn, config, sw.nodeKey.PrivKey) - if err != nil { - conn.Close() // peer is nil - return err - } - if err = sw.addPeer(peerConn); err != nil { - peerConn.CloseConn() - return err + if err := sw.addPeer(p); err != nil { + _ = p.Stop() + sw.Logger.Info( + "Ignoring inbound connection: error while adding peer", + "err", err, + "id", p.ID(), + ) + } } - - return nil } // dial the peer; make secret connection; authenticate against the dialed ID; @@ -547,109 +533,88 @@ func (sw *Switch) addInboundPeerWithConfig( // StopPeerForError is called func (sw *Switch) addOutboundPeerWithConfig( addr *NetAddress, - config *config.P2PConfig, + cfg *config.P2PConfig, persistent bool, ) error { sw.Logger.Info("Dialing peer", "address", addr) - peerConn, err := newOutboundPeerConn( - addr, - config, - persistent, - sw.nodeKey.PrivKey, - ) + + // XXX(xla): Remove the leakage of test concerns in implementation. + if cfg.TestDialFail { + go sw.reconnectToPeer(addr) + return fmt.Errorf("dial err (peerConfig.DialFail == true)") + } + + p, err := sw.transport.Dial(*addr, peerConfig{ + chDescs: sw.chDescs, + onPeerError: sw.StopPeerForError, + persistent: persistent, + reactorsByCh: sw.reactorsByCh, + }) if err != nil { + switch e := err.(type) { + case ErrRejected: + if e.IsSelf() { + // Remove the given address from the address book and add to our addresses + // to avoid dialing in the future. + sw.addrBook.RemoveAddress(addr) + sw.addrBook.AddOurAddress(addr) + } + } + if persistent { go sw.reconnectToPeer(addr) } + return err } - if err := sw.addPeer(peerConn); err != nil { - peerConn.CloseConn() + if err := sw.addPeer(p); err != nil { + _ = p.Stop() return err } + return nil } -// addPeer performs the Tendermint P2P handshake with a peer -// that already has a SecretConnection. If all goes well, -// it starts the peer and adds it to the switch. -// NOTE: This performs a blocking handshake before the peer is added. -// NOTE: If error is returned, caller is responsible for calling -// peer.CloseConn() -func (sw *Switch) addPeer(pc peerConn) error { - - addr := pc.conn.RemoteAddr() - if err := sw.FilterConnByAddr(addr); err != nil { - return err - } - - // Exchange NodeInfo on the conn - peerNodeInfo, err := pc.HandshakeTimeout(sw.nodeInfo, time.Duration(sw.config.HandshakeTimeout)) - if err != nil { - return err +func (sw *Switch) filterPeer(p Peer) error { + // Avoid duplicate + if sw.peers.Has(p.ID()) { + return ErrRejected{id: p.ID(), isDuplicate: true} } - peerID := peerNodeInfo.ID + errc := make(chan error, len(sw.peerFilters)) - // ensure connection key matches self reported key - connID := pc.ID() - - if peerID != connID { - return fmt.Errorf( - "nodeInfo.ID() (%v) doesn't match conn.ID() (%v)", - peerID, - connID, - ) + for _, f := range sw.peerFilters { + go func(f PeerFilterFunc, p Peer, errc chan<- error) { + errc <- f(sw.peers, p) + }(f, p, errc) } - // Validate the peers nodeInfo - if err := peerNodeInfo.Validate(); err != nil { - return err - } - - // Avoid self - if sw.nodeKey.ID() == peerID { - addr := peerNodeInfo.NetAddress() - // remove the given address from the address book - // and add to our addresses to avoid dialing again - if sw.addrBook != nil { - sw.addrBook.RemoveAddress(addr) - sw.addrBook.AddOurAddress(addr) + for i := 0; i < cap(errc); i++ { + select { + case err := <-errc: + if err != nil { + return ErrRejected{id: p.ID(), err: err, isFiltered: true} + } + case <-time.After(sw.filterTimeout): + return ErrFilterTimeout{} } - return ErrSwitchConnectToSelf{addr} - } - - // Avoid duplicate - if sw.peers.Has(peerID) { - return ErrSwitchDuplicatePeerID{peerID} - } - - // Check for duplicate connection or peer info IP. - if !sw.config.AllowDuplicateIP && - (sw.peers.HasIP(pc.RemoteIP()) || - sw.peers.HasIP(peerNodeInfo.NetAddress().IP)) { - return ErrSwitchDuplicatePeerIP{pc.RemoteIP()} } - // Filter peer against ID white list - if err := sw.FilterConnByID(peerID); err != nil { - return err - } + return nil +} - // Check version, chain id - if err := sw.nodeInfo.CompatibleWith(peerNodeInfo); err != nil { +// addPeer starts up the Peer and adds it to the Switch. +func (sw *Switch) addPeer(p Peer) error { + if err := sw.filterPeer(p); err != nil { return err } - peer := newPeer(pc, sw.mConfig, peerNodeInfo, sw.reactorsByCh, sw.chDescs, sw.StopPeerForError) - peer.SetLogger(sw.Logger.With("peer", addr)) - - peer.Logger.Info("Successful handshake with peer", "peerNodeInfo", peerNodeInfo) + p.SetLogger(sw.Logger.With("peer", p.NodeInfo().NetAddress().String)) // All good. Start peer if sw.IsRunning() { - if err = sw.startInitPeer(peer); err != nil { + if err := sw.startInitPeer(p); err != nil { return err } } @@ -657,25 +622,30 @@ func (sw *Switch) addPeer(pc peerConn) error { // Add the peer to .peers. // We start it first so that a peer in the list is safe to Stop. // It should not err since we already checked peers.Has(). - if err := sw.peers.Add(peer); err != nil { + if err := sw.peers.Add(p); err != nil { return err } + + sw.Logger.Info("Added peer", "peer", p) sw.metrics.Peers.Add(float64(1)) - sw.Logger.Info("Added peer", "peer", peer) return nil } -func (sw *Switch) startInitPeer(peer *peer) error { - err := peer.Start() // spawn send/recv routines +func (sw *Switch) startInitPeer(p Peer) error { + err := p.Start() // spawn send/recv routines if err != nil { // Should never happen - sw.Logger.Error("Error starting peer", "peer", peer, "err", err) + sw.Logger.Error( + "Error starting peer", + "err", err, + "peer", p, + ) return err } for _, reactor := range sw.reactors { - reactor.AddPeer(peer) + reactor.AddPeer(p) } return nil diff --git a/p2p/switch_test.go b/p2p/switch_test.go index 2ce297761..4fea3cfe0 100644 --- a/p2p/switch_test.go +++ b/p2p/switch_test.go @@ -3,7 +3,6 @@ package p2p import ( "bytes" "fmt" - "net" "sync" "testing" "time" @@ -11,10 +10,9 @@ import ( "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" + "github.com/tendermint/tendermint/config" "github.com/tendermint/tendermint/crypto/ed25519" "github.com/tendermint/tendermint/libs/log" - - "github.com/tendermint/tendermint/config" "github.com/tendermint/tendermint/p2p/conn" ) @@ -151,35 +149,6 @@ func assertMsgReceivedWithTimeout(t *testing.T, msgBytes []byte, channel byte, r } } -func TestConnAddrFilter(t *testing.T) { - s1 := MakeSwitch(cfg, 1, "testing", "123.123.123", initSwitchFunc) - s2 := MakeSwitch(cfg, 1, "testing", "123.123.123", initSwitchFunc) - defer s1.Stop() - defer s2.Stop() - - c1, c2 := conn.NetPipe() - - s1.SetAddrFilter(func(addr net.Addr) error { - if addr.String() == c1.RemoteAddr().String() { - return fmt.Errorf("Error: pipe is blacklisted") - } - return nil - }) - - // connect to good peer - go func() { - err := s1.addPeerWithConnection(c1) - assert.NotNil(t, err, "expected err") - }() - go func() { - err := s2.addPeerWithConnection(c2) - assert.NotNil(t, err, "expected err") - }() - - assertNoPeersAfterTimeout(t, s1, 400*time.Millisecond) - assertNoPeersAfterTimeout(t, s2, 400*time.Millisecond) -} - func TestSwitchFiltersOutItself(t *testing.T) { s1 := MakeSwitch(cfg, 1, "127.0.0.1", "123.123.123", initSwitchFunc) // addr := s1.NodeInfo().NetAddress() @@ -194,11 +163,16 @@ func TestSwitchFiltersOutItself(t *testing.T) { // addr should be rejected in addPeer based on the same ID err := s1.DialPeerWithAddress(rp.Addr(), false) if assert.Error(t, err) { - assert.Equal(t, ErrSwitchConnectToSelf{rp.Addr()}.Error(), err.Error()) + if err, ok := err.(ErrRejected); ok { + if !err.IsSelf() { + t.Errorf("expected self to be rejected") + } + } else { + t.Errorf("expected ErrRejected") + } } assert.True(t, s1.addrBook.OurAddress(rp.Addr())) - assert.False(t, s1.addrBook.HasAddress(rp.Addr())) rp.Stop() @@ -206,46 +180,124 @@ func TestSwitchFiltersOutItself(t *testing.T) { assertNoPeersAfterTimeout(t, s1, 100*time.Millisecond) } -func assertNoPeersAfterTimeout(t *testing.T, sw *Switch, timeout time.Duration) { - time.Sleep(timeout) - if sw.Peers().Size() != 0 { - t.Fatalf("Expected %v to not connect to some peers, got %d", sw, sw.Peers().Size()) +func TestSwitchPeerFilter(t *testing.T) { + var ( + filters = []PeerFilterFunc{ + func(_ IPeerSet, _ Peer) error { return nil }, + func(_ IPeerSet, _ Peer) error { return fmt.Errorf("denied!") }, + func(_ IPeerSet, _ Peer) error { return nil }, + } + sw = MakeSwitch( + cfg, + 1, + "testing", + "123.123.123", + initSwitchFunc, + SwitchPeerFilters(filters...), + ) + ) + defer sw.Stop() + + // simulate remote peer + rp := &remotePeer{PrivKey: ed25519.GenPrivKey(), Config: cfg} + rp.Start() + defer rp.Stop() + + p, err := sw.transport.Dial(*rp.Addr(), peerConfig{ + chDescs: sw.chDescs, + onPeerError: sw.StopPeerForError, + reactorsByCh: sw.reactorsByCh, + }) + if err != nil { + t.Fatal(err) + } + + err = sw.addPeer(p) + if err, ok := err.(ErrRejected); ok { + if !err.IsFiltered() { + t.Errorf("expected peer to be filtered") + } + } else { + t.Errorf("expected ErrRejected") } } -func TestConnIDFilter(t *testing.T) { - s1 := MakeSwitch(cfg, 1, "testing", "123.123.123", initSwitchFunc) - s2 := MakeSwitch(cfg, 1, "testing", "123.123.123", initSwitchFunc) - defer s1.Stop() - defer s2.Stop() +func TestSwitchPeerFilterTimeout(t *testing.T) { + var ( + filters = []PeerFilterFunc{ + func(_ IPeerSet, _ Peer) error { + time.Sleep(10 * time.Millisecond) + return nil + }, + } + sw = MakeSwitch( + cfg, + 1, + "testing", + "123.123.123", + initSwitchFunc, + SwitchFilterTimeout(5*time.Millisecond), + SwitchPeerFilters(filters...), + ) + ) + defer sw.Stop() - c1, c2 := conn.NetPipe() + // simulate remote peer + rp := &remotePeer{PrivKey: ed25519.GenPrivKey(), Config: cfg} + rp.Start() + defer rp.Stop() - s1.SetIDFilter(func(id ID) error { - if id == s2.nodeInfo.ID { - return fmt.Errorf("Error: pipe is blacklisted") - } - return nil + p, err := sw.transport.Dial(*rp.Addr(), peerConfig{ + chDescs: sw.chDescs, + onPeerError: sw.StopPeerForError, + reactorsByCh: sw.reactorsByCh, }) + if err != nil { + t.Fatal(err) + } - s2.SetIDFilter(func(id ID) error { - if id == s1.nodeInfo.ID { - return fmt.Errorf("Error: pipe is blacklisted") - } - return nil + err = sw.addPeer(p) + if _, ok := err.(ErrFilterTimeout); !ok { + t.Errorf("expected ErrFilterTimeout") + } +} + +func TestSwitchPeerFilterDuplicate(t *testing.T) { + sw := MakeSwitch(cfg, 1, "testing", "123.123.123", initSwitchFunc) + + // simulate remote peer + rp := &remotePeer{PrivKey: ed25519.GenPrivKey(), Config: cfg} + rp.Start() + defer rp.Stop() + + p, err := sw.transport.Dial(*rp.Addr(), peerConfig{ + chDescs: sw.chDescs, + onPeerError: sw.StopPeerForError, + reactorsByCh: sw.reactorsByCh, }) + if err != nil { + t.Fatal(err) + } - go func() { - err := s1.addPeerWithConnection(c1) - assert.NotNil(t, err, "expected error") - }() - go func() { - err := s2.addPeerWithConnection(c2) - assert.NotNil(t, err, "expected error") - }() + if err := sw.addPeer(p); err != nil { + t.Fatal(err) + } + + err = sw.addPeer(p) + if err, ok := err.(ErrRejected); ok { + if !err.IsDuplicate() { + t.Errorf("expected peer to be duplicate") + } + } else { + t.Errorf("expected ErrRejected") + } +} - assertNoPeersAfterTimeout(t, s1, 400*time.Millisecond) - assertNoPeersAfterTimeout(t, s2, 400*time.Millisecond) +func assertNoPeersAfterTimeout(t *testing.T, sw *Switch, timeout time.Duration) { + time.Sleep(timeout) + if sw.Peers().Size() != 0 { + t.Fatalf("Expected %v to not connect to some peers, got %d", sw, sw.Peers().Size()) + } } func TestSwitchStopsNonPersistentPeerOnError(t *testing.T) { @@ -263,19 +315,23 @@ func TestSwitchStopsNonPersistentPeerOnError(t *testing.T) { rp.Start() defer rp.Stop() - pc, err := newOutboundPeerConn(rp.Addr(), cfg, false, sw.nodeKey.PrivKey) + p, err := sw.transport.Dial(*rp.Addr(), peerConfig{ + chDescs: sw.chDescs, + onPeerError: sw.StopPeerForError, + reactorsByCh: sw.reactorsByCh, + }) require.Nil(err) - err = sw.addPeer(pc) + + err = sw.addPeer(p) require.Nil(err) - peer := sw.Peers().Get(rp.ID()) - require.NotNil(peer) + require.NotNil(sw.Peers().Get(rp.ID())) // simulate failure by closing connection - pc.CloseConn() + p.(*peer).CloseConn() assertNoPeersAfterTimeout(t, sw, 100*time.Millisecond) - assert.False(peer.IsRunning()) + assert.False(p.IsRunning()) } func TestSwitchReconnectsToPersistentPeer(t *testing.T) { @@ -293,17 +349,20 @@ func TestSwitchReconnectsToPersistentPeer(t *testing.T) { rp.Start() defer rp.Stop() - pc, err := newOutboundPeerConn(rp.Addr(), cfg, true, sw.nodeKey.PrivKey) - // sw.reactorsByCh, sw.chDescs, sw.StopPeerForError, sw.nodeKey.PrivKey, + p, err := sw.transport.Dial(*rp.Addr(), peerConfig{ + chDescs: sw.chDescs, + onPeerError: sw.StopPeerForError, + persistent: true, + reactorsByCh: sw.reactorsByCh, + }) require.Nil(err) - require.Nil(sw.addPeer(pc)) + require.Nil(sw.addPeer(p)) - peer := sw.Peers().Get(rp.ID()) - require.NotNil(peer) + require.NotNil(sw.Peers().Get(rp.ID())) // simulate failure by closing connection - pc.CloseConn() + p.(*peer).CloseConn() // TODO: remove sleep, detect the disconnection, wait for reconnect npeers := sw.Peers().Size() @@ -315,7 +374,7 @@ func TestSwitchReconnectsToPersistentPeer(t *testing.T) { } } assert.NotZero(npeers) - assert.False(peer.IsRunning()) + assert.False(p.IsRunning()) // simulate another remote peer rp = &remotePeer{ diff --git a/p2p/test_util.go b/p2p/test_util.go index b88dfb06a..64b8b215c 100644 --- a/p2p/test_util.go +++ b/p2p/test_util.go @@ -3,7 +3,9 @@ package p2p import ( "fmt" "net" + "time" + crypto "github.com/tendermint/tendermint/crypto" "github.com/tendermint/tendermint/crypto/ed25519" cmn "github.com/tendermint/tendermint/libs/common" "github.com/tendermint/tendermint/libs/log" @@ -104,14 +106,32 @@ func Connect2Switches(switches []*Switch, i, j int) { } func (sw *Switch) addPeerWithConnection(conn net.Conn) error { - pc, err := newInboundPeerConn(conn, sw.config, sw.nodeKey.PrivKey) + pc, err := testInboundPeerConn(conn, sw.config, sw.nodeKey.PrivKey) if err != nil { if err := conn.Close(); err != nil { sw.Logger.Error("Error closing connection", "err", err) } return err } - if err = sw.addPeer(pc); err != nil { + + ni, err := handshake(conn, 50*time.Millisecond, sw.nodeInfo) + if err != nil { + if err := conn.Close(); err != nil { + sw.Logger.Error("Error closing connection", "err", err) + } + return err + } + + p := newPeer( + pc, + sw.mConfig, + ni, + sw.reactorsByCh, + sw.chDescs, + sw.StopPeerForError, + ) + + if err = sw.addPeer(p); err != nil { pc.CloseConn() return err } @@ -131,35 +151,99 @@ func StartSwitches(switches []*Switch) error { return nil } -func MakeSwitch(cfg *config.P2PConfig, i int, network, version string, initSwitch func(int, *Switch) *Switch) *Switch { - // new switch, add reactors - // TODO: let the config be passed in? - nodeKey := &NodeKey{ - PrivKey: ed25519.GenPrivKey(), +func MakeSwitch( + cfg *config.P2PConfig, + i int, + network, version string, + initSwitch func(int, *Switch) *Switch, + opts ...SwitchOption, +) *Switch { + var ( + nodeKey = NodeKey{ + PrivKey: ed25519.GenPrivKey(), + } + ni = NodeInfo{ + ID: nodeKey.ID(), + Moniker: fmt.Sprintf("switch%d", i), + Network: network, + Version: version, + ListenAddr: fmt.Sprintf("127.0.0.1:%d", cmn.RandIntn(64512)+1023), + Other: NodeInfoOther{ + AminoVersion: "1.0", + P2PVersion: "1.0", + ConsensusVersion: "1.0", + RPCVersion: "1.0", + TxIndex: "off", + RPCAddress: fmt.Sprintf("127.0.0.1:%d", cmn.RandIntn(64512)+1023), + }, + } + ) + + addr, err := NewNetAddressStringWithOptionalID( + IDAddressString(nodeKey.ID(), ni.ListenAddr), + ) + if err != nil { + panic(err) } - sw := NewSwitch(cfg) - sw.SetLogger(log.TestingLogger()) - sw = initSwitch(i, sw) - addr := fmt.Sprintf("127.0.0.1:%d", cmn.RandIntn(64512)+1023) - ni := NodeInfo{ - ID: nodeKey.ID(), - Moniker: fmt.Sprintf("switch%d", i), - Network: network, - Version: version, - ListenAddr: addr, - Other: NodeInfoOther{ - AminoVersion: "1.0", - P2PVersion: "1.0", - ConsensusVersion: "1.0", - RPCVersion: "1.0", - TxIndex: "off", - RPCAddress: addr, - }, + + t := NewMultiplexTransport(ni, nodeKey) + + if err := t.Listen(*addr); err != nil { + panic(err) } + + // TODO: let the config be passed in? + sw := initSwitch(i, NewSwitch(cfg, t, opts...)) + sw.SetLogger(log.TestingLogger()) + sw.SetNodeKey(&nodeKey) + for ch := range sw.reactorsByCh { ni.Channels = append(ni.Channels, ch) } + + // TODO: We need to setup reactors ahead of time so the NodeInfo is properly + // populated and we don't have to do those awkward overrides and setters. + t.nodeInfo = ni sw.SetNodeInfo(ni) - sw.SetNodeKey(nodeKey) + return sw } + +func testInboundPeerConn( + conn net.Conn, + config *config.P2PConfig, + ourNodePrivKey crypto.PrivKey, +) (peerConn, error) { + return testPeerConn(conn, config, false, false, ourNodePrivKey, nil) +} + +func testPeerConn( + rawConn net.Conn, + cfg *config.P2PConfig, + outbound, persistent bool, + ourNodePrivKey crypto.PrivKey, + originalAddr *NetAddress, +) (pc peerConn, err error) { + conn := rawConn + + // Fuzz connection + if cfg.TestFuzz { + // so we have time to do peer handshakes and get set up + conn = FuzzConnAfterFromConfig(conn, 10*time.Second, cfg.TestFuzzConfig) + } + + // Encrypt connection + conn, err = upgradeSecretConn(conn, cfg.HandshakeTimeout, ourNodePrivKey) + if err != nil { + return pc, cmn.ErrorWrap(err, "Error creating peer") + } + + // Only the information we already have + return peerConn{ + config: cfg, + outbound: outbound, + persistent: persistent, + conn: conn, + originalAddr: originalAddr, + }, nil +} diff --git a/rpc/core/consensus.go b/rpc/core/consensus.go index 4e4c54dea..193fbd285 100644 --- a/rpc/core/consensus.go +++ b/rpc/core/consensus.go @@ -191,7 +191,7 @@ func Validators(heightPtr *int64) (*ctypes.ResultValidators, error) { // ``` func DumpConsensusState() (*ctypes.ResultDumpConsensusState, error) { // Get Peer consensus states. - peers := p2pSwitch.Peers().List() + peers := p2pPeers.Peers().List() peerStates := make([]ctypes.PeerStateInfo, len(peers)) for i, peer := range peers { peerState := peer.Get(types.PeerStateKey).(*cm.PeerState) diff --git a/rpc/core/net.go b/rpc/core/net.go index ba9753d81..acb18a34f 100644 --- a/rpc/core/net.go +++ b/rpc/core/net.go @@ -35,13 +35,8 @@ import ( // } // ``` func NetInfo() (*ctypes.ResultNetInfo, error) { - listening := p2pSwitch.IsListening() - listeners := []string{} - for _, listener := range p2pSwitch.Listeners() { - listeners = append(listeners, listener.String()) - } peers := []ctypes.Peer{} - for _, peer := range p2pSwitch.Peers().List() { + for _, peer := range p2pPeers.Peers().List() { peers = append(peers, ctypes.Peer{ NodeInfo: peer.NodeInfo(), IsOutbound: peer.IsOutbound(), @@ -52,8 +47,8 @@ func NetInfo() (*ctypes.ResultNetInfo, error) { // PRO: useful info // CON: privacy return &ctypes.ResultNetInfo{ - Listening: listening, - Listeners: listeners, + Listening: p2pTransport.IsListening(), + Listeners: p2pTransport.Listeners(), NPeers: len(peers), Peers: peers, }, nil @@ -65,7 +60,7 @@ func UnsafeDialSeeds(seeds []string) (*ctypes.ResultDialSeeds, error) { } // starts go routines to dial each peer after random delays logger.Info("DialSeeds", "addrBook", addrBook, "seeds", seeds) - err := p2pSwitch.DialPeersAsync(addrBook, seeds, false) + err := p2pPeers.DialPeersAsync(addrBook, seeds, false) if err != nil { return &ctypes.ResultDialSeeds{}, err } @@ -78,7 +73,7 @@ func UnsafeDialPeers(peers []string, persistent bool) (*ctypes.ResultDialPeers, } // starts go routines to dial each peer after random delays logger.Info("DialPeers", "addrBook", addrBook, "peers", peers, "persistent", persistent) - err := p2pSwitch.DialPeersAsync(addrBook, peers, persistent) + err := p2pPeers.DialPeersAsync(addrBook, peers, persistent) if err != nil { return &ctypes.ResultDialPeers{}, err } diff --git a/rpc/core/pipe.go b/rpc/core/pipe.go index 1d1f61466..188ea1c36 100644 --- a/rpc/core/pipe.go +++ b/rpc/core/pipe.go @@ -34,13 +34,16 @@ type Consensus interface { GetRoundStateSimpleJSON() ([]byte, error) } -type P2P interface { - Listeners() []p2p.Listener - Peers() p2p.IPeerSet - NumPeers() (outbound, inbound, dialig int) - NodeInfo() p2p.NodeInfo +type transport interface { + Listeners() []string IsListening() bool + NodeInfo() p2p.NodeInfo +} + +type peers interface { DialPeersAsync(p2p.AddrBook, []string, bool) error + NumPeers() (outbound, inbound, dialig int) + Peers() p2p.IPeerSet } //---------------------------------------------- @@ -56,7 +59,8 @@ var ( blockStore sm.BlockStore evidencePool sm.EvidencePool consensusState Consensus - p2pSwitch P2P + p2pPeers peers + p2pTransport transport // objects pubKey crypto.PubKey @@ -90,8 +94,12 @@ func SetConsensusState(cs Consensus) { consensusState = cs } -func SetSwitch(sw P2P) { - p2pSwitch = sw +func SetP2PPeers(p peers) { + p2pPeers = p +} + +func SetP2PTransport(t transport) { + p2pTransport = t } func SetPubKey(pk crypto.PubKey) { diff --git a/rpc/core/status.go b/rpc/core/status.go index de8d69cce..17fb2f341 100644 --- a/rpc/core/status.go +++ b/rpc/core/status.go @@ -91,7 +91,7 @@ func Status() (*ctypes.ResultStatus, error) { } result := &ctypes.ResultStatus{ - NodeInfo: p2pSwitch.NodeInfo(), + NodeInfo: p2pTransport.NodeInfo(), SyncInfo: ctypes.SyncInfo{ LatestBlockHash: latestBlockHash, LatestAppHash: latestAppHash,