diff --git a/blockchain/reactor_test.go b/blockchain/reactor_test.go index 63e3c72bb..49913c10e 100644 --- a/blockchain/reactor_test.go +++ b/blockchain/reactor_test.go @@ -1,6 +1,7 @@ package blockchain import ( + "net" "testing" cmn "github.com/tendermint/tmlibs/common" @@ -204,3 +205,4 @@ func (tp *bcrTestPeer) IsOutbound() bool { return false } func (tp *bcrTestPeer) IsPersistent() bool { return true } func (tp *bcrTestPeer) Get(s string) interface{} { return s } func (tp *bcrTestPeer) Set(string, interface{}) {} +func (tp *bcrTestPeer) RemoteIP() net.IP { return []byte{127, 0, 0, 1} } diff --git a/consensus/byzantine_test.go b/consensus/byzantine_test.go index 5f04a3308..f18f16230 100644 --- a/consensus/byzantine_test.go +++ b/consensus/byzantine_test.go @@ -27,7 +27,7 @@ func init() { // Heal partition and ensure A sees the commit func TestByzantine(t *testing.T) { N := 4 - logger := consensusLogger() + logger := consensusLogger().With("test", "byzantine") css := randConsensusNet(N, "consensus_byzantine_test", newMockTickerFunc(false), newCounter) // give the byzantine validator a normal ticker diff --git a/p2p/dummy/peer.go b/p2p/dummy/peer.go index 97fb7e2ef..fc2242366 100644 --- a/p2p/dummy/peer.go +++ b/p2p/dummy/peer.go @@ -1,6 +1,8 @@ package dummy import ( + "net" + p2p "github.com/tendermint/tendermint/p2p" tmconn "github.com/tendermint/tendermint/p2p/conn" cmn "github.com/tendermint/tmlibs/common" @@ -19,6 +21,7 @@ func NewPeer() *peer { kv: make(map[string]interface{}), } p.BaseService = *cmn.NewBaseService(nil, "peer", p) + return p } @@ -42,6 +45,11 @@ func (p *peer) NodeInfo() p2p.NodeInfo { return p2p.NodeInfo{} } +// RemoteIP always returns localhost. +func (p *peer) RemoteIP() net.IP { + return net.ParseIP("127.0.0.1") +} + // Status always returns empry connection status. func (p *peer) Status() tmconn.ConnectionStatus { return tmconn.ConnectionStatus{} diff --git a/p2p/errors.go b/p2p/errors.go index f4a09e6c0..fc477d1c2 100644 --- a/p2p/errors.go +++ b/p2p/errors.go @@ -1,14 +1,38 @@ package p2p import ( - "errors" "fmt" + "net" ) -var ( - ErrSwitchDuplicatePeer = errors.New("Duplicate peer") - ErrSwitchConnectToSelf = errors.New("Connect to self") -) +// ErrSwitchDuplicatePeerID to be raised when a peer is connecting with a known +// ID. +type ErrSwitchDuplicatePeerID struct { + ID ID +} + +func (e ErrSwitchDuplicatePeerID) Error() string { + return fmt.Sprintf("Duplicate peer ID %v", e.ID) +} + +// ErrSwitchDuplicatePeerIP to be raised whena a peer is connecting with a known +// IP. +type ErrSwitchDuplicatePeerIP struct { + IP net.IP +} + +func (e ErrSwitchDuplicatePeerIP) Error() string { + return fmt.Sprintf("Duplicate peer IP %v", e.IP.String()) +} + +// ErrSwitchConnectToSelf to be raised when trying to connect to itself. +type ErrSwitchConnectToSelf struct { + Addr *NetAddress +} + +func (e ErrSwitchConnectToSelf) Error() string { + return fmt.Sprintf("Connect to self: %v", e.Addr) +} type ErrSwitchAuthenticationFailure struct { Dialed *NetAddress @@ -16,7 +40,11 @@ type ErrSwitchAuthenticationFailure struct { } func (e ErrSwitchAuthenticationFailure) Error() string { - return fmt.Sprintf("Failed to authenticate peer. Dialed %v, but got peer with ID %s", e.Dialed, e.Got) + return fmt.Sprintf( + "Failed to authenticate peer. Dialed %v, but got peer with ID %s", + e.Dialed, + e.Got, + ) } //------------------------------------------------------------------- diff --git a/p2p/peer.go b/p2p/peer.go index b9c8f8b41..447225bfb 100644 --- a/p2p/peer.go +++ b/p2p/peer.go @@ -3,6 +3,7 @@ package p2p import ( "fmt" "net" + "sync/atomic" "time" "github.com/tendermint/go-crypto" @@ -12,11 +13,14 @@ import ( tmconn "github.com/tendermint/tendermint/p2p/conn" ) +var testIPSuffix uint32 = 0 + // Peer is an interface representing a peer connected on a reactor. type Peer interface { cmn.Service ID() ID // peer's cryptographic ID + RemoteIP() net.IP // remote IP of the connection IsOutbound() bool // did we dial the peer IsPersistent() bool // do we redial this peer when we disconnect NodeInfo() NodeInfo // peer's info @@ -37,6 +41,7 @@ type peerConn struct { persistent bool config *PeerConfig conn net.Conn // source connection + ip net.IP } // ID only exists for SecretConnection. @@ -45,6 +50,35 @@ func (pc peerConn) ID() ID { return PubKeyToID(pc.conn.(*tmconn.SecretConnection).RemotePubKey()) } +// Return the IP from the connection RemoteAddr +func (pc peerConn) RemoteIP() net.IP { + if pc.ip != nil { + return pc.ip + } + + // In test cases a conn could not be present at all or be an in-memory + // implementation where we want to return a fake ip. + if pc.conn == nil || pc.conn.RemoteAddr().String() == "pipe" { + pc.ip = net.IP{172, 16, 0, byte(atomic.AddUint32(&testIPSuffix, 1))} + + return pc.ip + } + + host, _, err := net.SplitHostPort(pc.conn.RemoteAddr().String()) + if err != nil { + panic(err) + } + + ips, err := net.LookupIP(host) + if err != nil { + panic(err) + } + + pc.ip = ips[0] + + return pc.ip +} + // peer implements Peer. // // Before using a peer, you will need to perform a handshake on connection. diff --git a/p2p/peer_set.go b/p2p/peer_set.go index a4565ea1d..66a7fdadb 100644 --- a/p2p/peer_set.go +++ b/p2p/peer_set.go @@ -1,12 +1,14 @@ package p2p import ( + "net" "sync" ) // IPeerSet has a (immutable) subset of the methods of PeerSet. type IPeerSet interface { Has(key ID) bool + HasIP(ip net.IP) bool Get(key ID) Peer List() []Peer Size() int @@ -36,12 +38,17 @@ func NewPeerSet() *PeerSet { } // Add adds the peer to the PeerSet. -// It returns ErrSwitchDuplicatePeer if the peer is already present. +// It returns an error carrying the reason, if the peer is already present. func (ps *PeerSet) Add(peer Peer) error { ps.mtx.Lock() defer ps.mtx.Unlock() + if ps.lookup[peer.ID()] != nil { - return ErrSwitchDuplicatePeer + return ErrSwitchDuplicatePeerID{peer.ID()} + } + + if ps.hasIP(peer.RemoteIP()) { + return ErrSwitchDuplicatePeerIP{peer.RemoteIP()} } index := len(ps.list) @@ -61,6 +68,27 @@ func (ps *PeerSet) Has(peerKey ID) bool { return ok } +// HasIP returns true if the PeerSet contains the peer referred to by this IP +// address. +func (ps *PeerSet) HasIP(peerIP net.IP) bool { + ps.mtx.Lock() + defer ps.mtx.Unlock() + + return ps.hasIP(peerIP) +} + +// hasIP does not acquire a lock so it can be used in public methods which +// already lock. +func (ps *PeerSet) hasIP(peerIP net.IP) bool { + for _, item := range ps.lookup { + if item.peer.RemoteIP().Equal(peerIP) { + return true + } + } + + return false +} + // Get looks up a peer by the provided peerKey. func (ps *PeerSet) Get(peerKey ID) Peer { ps.mtx.Lock() @@ -76,6 +104,7 @@ func (ps *PeerSet) Get(peerKey ID) Peer { func (ps *PeerSet) Remove(peer Peer) { ps.mtx.Lock() defer ps.mtx.Unlock() + item := ps.lookup[peer.ID()] if item == nil { return diff --git a/p2p/peer_set_test.go b/p2p/peer_set_test.go index 872758355..fc3004684 100644 --- a/p2p/peer_set_test.go +++ b/p2p/peer_set_test.go @@ -2,6 +2,7 @@ package p2p import ( "math/rand" + "net" "sync" "testing" @@ -12,23 +13,32 @@ import ( ) // Returns an empty kvstore peer -func randPeer() *peer { +func randPeer(ip net.IP) *peer { + if ip == nil { + ip = net.IP{127, 0, 0, 1} + } + nodeKey := NodeKey{PrivKey: crypto.GenPrivKeyEd25519()} - return &peer{ + p := &peer{ nodeInfo: NodeInfo{ ID: nodeKey.ID(), ListenAddr: cmn.Fmt("%v.%v.%v.%v:46656", rand.Int()%256, rand.Int()%256, rand.Int()%256, rand.Int()%256), }, } + + p.ip = ip + + return p } func TestPeerSetAddRemoveOne(t *testing.T) { t.Parallel() + peerSet := NewPeerSet() var peerList []Peer for i := 0; i < 5; i++ { - p := randPeer() + p := randPeer(net.IP{127, 0, 0, byte(i)}) if err := peerSet.Add(p); err != nil { t.Error(err) } @@ -72,7 +82,7 @@ func TestPeerSetAddRemoveMany(t *testing.T) { peers := []Peer{} N := 100 for i := 0; i < N; i++ { - peer := randPeer() + peer := randPeer(net.IP{127, 0, 0, byte(i)}) if err := peerSet.Add(peer); err != nil { t.Errorf("Failed to add new peer") } @@ -96,7 +106,7 @@ func TestPeerSetAddRemoveMany(t *testing.T) { func TestPeerSetAddDuplicate(t *testing.T) { t.Parallel() peerSet := NewPeerSet() - peer := randPeer() + peer := randPeer(nil) n := 20 errsChan := make(chan error) @@ -112,25 +122,49 @@ func TestPeerSetAddDuplicate(t *testing.T) { } // Now collect and tally the results - errsTally := make(map[error]int) + errsTally := make(map[string]int) for i := 0; i < n; i++ { err := <-errsChan - errsTally[err]++ + + switch err.(type) { + case ErrSwitchDuplicatePeerID: + errsTally["duplicateID"]++ + default: + errsTally["other"]++ + } } // Our next procedure is to ensure that only one addition // succeeded and that the rest are each ErrSwitchDuplicatePeer. - wantErrCount, gotErrCount := n-1, errsTally[ErrSwitchDuplicatePeer] + wantErrCount, gotErrCount := n-1, errsTally["duplicateID"] assert.Equal(t, wantErrCount, gotErrCount, "invalid ErrSwitchDuplicatePeer count") - wantNilErrCount, gotNilErrCount := 1, errsTally[nil] + wantNilErrCount, gotNilErrCount := 1, errsTally["other"] assert.Equal(t, wantNilErrCount, gotNilErrCount, "invalid nil errCount") } -func TestPeerSetGet(t *testing.T) { +func TestPeerSetAddDuplicateIP(t *testing.T) { t.Parallel() + peerSet := NewPeerSet() - peer := randPeer() + + if err := peerSet.Add(randPeer(net.IP{172, 0, 0, 1})); err != nil { + t.Fatal(err) + } + + // Add peer with same IP. + err := peerSet.Add(randPeer(net.IP{172, 0, 0, 1})) + assert.Equal(t, ErrSwitchDuplicatePeerIP{IP: net.IP{172, 0, 0, 1}}, err) +} + +func TestPeerSetGet(t *testing.T) { + t.Parallel() + + var ( + peerSet = NewPeerSet() + peer = randPeer(nil) + ) + assert.Nil(t, peerSet.Get(peer.ID()), "expecting a nil lookup, before .Add") if err := peerSet.Add(peer); err != nil { @@ -144,8 +178,8 @@ func TestPeerSetGet(t *testing.T) { wg.Add(1) go func(i int) { defer wg.Done() - got, want := peerSet.Get(peer.ID()), peer - assert.Equal(t, got, want, "#%d: got=%v want=%v", i, got, want) + have, want := peerSet.Get(peer.ID()), peer + assert.Equal(t, have, want, "%d: have %v, want %v", i, have, want) }(i) } wg.Wait() diff --git a/p2p/peer_test.go b/p2p/peer_test.go index 24d750a9f..22913f2de 100644 --- a/p2p/peer_test.go +++ b/p2p/peer_test.go @@ -11,6 +11,7 @@ import ( crypto "github.com/tendermint/go-crypto" tmconn "github.com/tendermint/tendermint/p2p/conn" + cmn "github.com/tendermint/tmlibs/common" "github.com/tendermint/tmlibs/log" ) @@ -111,35 +112,44 @@ func createOutboundPeerAndPerformHandshake(addr *NetAddress, config *PeerConfig) } type remotePeer struct { - PrivKey crypto.PrivKey - Config *PeerConfig - addr *NetAddress - quit chan struct{} + PrivKey crypto.PrivKey + Config *PeerConfig + addr *NetAddress + quit chan struct{} + channels cmn.HexBytes + listenAddr string } -func (p *remotePeer) Addr() *NetAddress { - return p.addr +func (rp *remotePeer) Addr() *NetAddress { + return rp.addr } -func (p *remotePeer) ID() ID { - return PubKeyToID(p.PrivKey.PubKey()) +func (rp *remotePeer) ID() ID { + return PubKeyToID(rp.PrivKey.PubKey()) } -func (p *remotePeer) Start() { - l, e := net.Listen("tcp", "127.0.0.1:0") // any available address +func (rp *remotePeer) Start() { + if rp.listenAddr == "" { + rp.listenAddr = "127.0.0.1:0" + } + + l, e := net.Listen("tcp", rp.listenAddr) // any available address if e != nil { golog.Fatalf("net.Listen tcp :0: %+v", e) } - p.addr = NewNetAddress(PubKeyToID(p.PrivKey.PubKey()), l.Addr()) - p.quit = make(chan struct{}) - go p.accept(l) + rp.addr = NewNetAddress(PubKeyToID(rp.PrivKey.PubKey()), l.Addr()) + rp.quit = make(chan struct{}) + if rp.channels == nil { + rp.channels = []byte{testCh} + } + go rp.accept(l) } -func (p *remotePeer) Stop() { - close(p.quit) +func (rp *remotePeer) Stop() { + close(rp.quit) } -func (p *remotePeer) accept(l net.Listener) { +func (rp *remotePeer) accept(l net.Listener) { conns := []net.Conn{} for { @@ -147,17 +157,19 @@ func (p *remotePeer) accept(l net.Listener) { if err != nil { golog.Fatalf("Failed to accept conn: %+v", err) } - pc, err := newInboundPeerConn(conn, p.Config, p.PrivKey) + + pc, err := newInboundPeerConn(conn, rp.Config, rp.PrivKey) if err != nil { golog.Fatalf("Failed to create a peer: %+v", err) } + _, err = pc.HandshakeTimeout(NodeInfo{ - ID: p.Addr().ID, + ID: rp.Addr().ID, Moniker: "remote_peer", Network: "testing", Version: "123.123.123", ListenAddr: l.Addr().String(), - Channels: []byte{testCh}, + Channels: rp.channels, }, 1*time.Second) if err != nil { golog.Fatalf("Failed to perform handshake: %+v", err) @@ -166,7 +178,7 @@ func (p *remotePeer) accept(l net.Listener) { conns = append(conns, conn) select { - case <-p.quit: + case <-rp.quit: for _, conn := range conns { if err := conn.Close(); err != nil { golog.Fatal(err) diff --git a/p2p/pex/pex_reactor_test.go b/p2p/pex/pex_reactor_test.go index f7297a343..fc40f6fa0 100644 --- a/p2p/pex/pex_reactor_test.go +++ b/p2p/pex/pex_reactor_test.go @@ -3,6 +3,7 @@ package pex import ( "fmt" "io/ioutil" + "net" "os" "path/filepath" "testing" @@ -72,7 +73,7 @@ func TestPEXReactorRunning(t *testing.T) { // create switches for i := 0; i < N; i++ { - switches[i] = p2p.MakeSwitch(config, i, "127.0.0.1", "123.123.123", func(i int, sw *p2p.Switch) *p2p.Switch { + switches[i] = p2p.MakeSwitch(config, i, "testing", "123.123.123", func(i int, sw *p2p.Switch) *p2p.Switch { books[i] = NewAddrBook(filepath.Join(dir, fmt.Sprintf("addrbook%d.json", i)), false) books[i].SetLogger(logger.With("pex", i)) sw.SetAddrBook(books[i]) @@ -365,6 +366,7 @@ func (mp mockPeer) NodeInfo() p2p.NodeInfo { ListenAddr: mp.addr.DialString(), } } +func (mp mockPeer) RemoteIP() net.IP { return net.ParseIP("127.0.0.1") } func (mp mockPeer) Status() conn.ConnectionStatus { return conn.ConnectionStatus{} } func (mp mockPeer) Send(byte, []byte) bool { return false } func (mp mockPeer) TrySend(byte, []byte) bool { return false } diff --git a/p2p/switch.go b/p2p/switch.go index f62e5f992..6ea7e408f 100644 --- a/p2p/switch.go +++ b/p2p/switch.go @@ -403,8 +403,8 @@ func (sw *Switch) DialPeersAsync(addrBook AddrBook, peers []string, persistent b sw.randomSleep(0) err := sw.DialPeerWithAddress(addr, persistent) if err != nil { - switch err { - case ErrSwitchConnectToSelf, ErrSwitchDuplicatePeer: + switch err.(type) { + case ErrSwitchConnectToSelf, ErrSwitchDuplicatePeerID: sw.Logger.Debug("Error dialing peer", "err", err) default: sw.Logger.Error("Error dialing peer", "err", err) @@ -564,20 +564,22 @@ func (sw *Switch) addPeer(pc peerConn) error { // Avoid self if sw.nodeKey.ID() == peerID { addr := peerNodeInfo.NetAddress() - - // remove the given address from the address book if we added it earlier + // remove the given address from the address book + // and add to our addresses to avoid dialing again sw.addrBook.RemoveAddress(addr) - - // add the given address to the address book to avoid dialing ourselves - // again this is our public address sw.addrBook.AddOurAddress(addr) - - return ErrSwitchConnectToSelf + return ErrSwitchConnectToSelf{} } // Avoid duplicate if sw.peers.Has(peerID) { - return ErrSwitchDuplicatePeer + return ErrSwitchDuplicatePeerID{peerID} + } + + // Check for duplicate connection or peer info IP. + if sw.peers.HasIP(pc.RemoteIP()) || + sw.peers.HasIP(peerNodeInfo.NetAddress().IP) { + return ErrSwitchDuplicatePeerIP{pc.RemoteIP()} } // Filter peer against ID white list diff --git a/p2p/switch_test.go b/p2p/switch_test.go index 25ed73bce..2c59d13e4 100644 --- a/p2p/switch_test.go +++ b/p2p/switch_test.go @@ -193,7 +193,7 @@ func TestSwitchFiltersOutItself(t *testing.T) { // addr should be rejected in addPeer based on the same ID err := s1.DialPeerWithAddress(rp.Addr(), false) if assert.Error(t, err) { - assert.Equal(t, ErrSwitchConnectToSelf, err) + assert.EqualValues(t, ErrSwitchConnectToSelf{}, err) } assert.True(t, s1.addrBook.OurAddress(rp.Addr())) @@ -317,7 +317,13 @@ func TestSwitchReconnectsToPersistentPeer(t *testing.T) { assert.False(peer.IsRunning()) // simulate another remote peer - rp = &remotePeer{PrivKey: crypto.GenPrivKeyEd25519(), Config: DefaultPeerConfig()} + rp = &remotePeer{ + PrivKey: crypto.GenPrivKeyEd25519(), + Config: DefaultPeerConfig(), + // Use different interface to prevent duplicate IP filter, this will break + // beyond two peers. + listenAddr: "127.0.0.2:0", + } rp.Start() defer rp.Stop() diff --git a/p2p/test_util.go b/p2p/test_util.go index 2c90bf516..86955f692 100644 --- a/p2p/test_util.go +++ b/p2p/test_util.go @@ -1,7 +1,9 @@ package p2p import ( + "fmt" "net" + "sync/atomic" crypto "github.com/tendermint/go-crypto" cmn "github.com/tendermint/tmlibs/common" @@ -80,7 +82,9 @@ func MakeConnectedSwitches(cfg *cfg.P2PConfig, n int, initSwitch func(int, *Swit func Connect2Switches(switches []*Switch, i, j int) { switchI := switches[i] switchJ := switches[j] + c1, c2 := conn.NetPipe() + doneCh := make(chan struct{}) go func() { err := switchI.addPeerWithConnection(c1) @@ -128,6 +132,8 @@ func StartSwitches(switches []*Switch) error { return nil } +var listenAddrSuffix uint32 = 1 + func MakeSwitch(cfg *cfg.P2PConfig, i int, network, version string, initSwitch func(int, *Switch) *Switch) *Switch { // new switch, add reactors // TODO: let the config be passed in? @@ -142,7 +148,7 @@ func MakeSwitch(cfg *cfg.P2PConfig, i int, network, version string, initSwitch f Moniker: cmn.Fmt("switch%d", i), Network: network, Version: version, - ListenAddr: cmn.Fmt("%v:%v", network, cmn.RandIntn(64512)+1023), + ListenAddr: fmt.Sprintf("127.0.0.%d:%d", atomic.AddUint32(&listenAddrSuffix, 1), cmn.RandIntn(64512)+1023), } for ch := range sw.reactorsByCh { ni.Channels = append(ni.Channels, ch)