Browse Source

2 kinds of peers: outbound and inbound

pull/456/head
Anton Kaliaev 8 years ago
parent
commit
1d01f6af98
No known key found for this signature in database GPG Key ID: 7B6881D965918214
4 changed files with 30 additions and 27 deletions
  1. +14
    -10
      peer.go
  2. +2
    -2
      peer_test.go
  3. +8
    -9
      switch.go
  4. +6
    -6
      switch_test.go

+ 14
- 10
peer.go View File

@ -32,7 +32,7 @@ type Peer struct {
Data *cmn.CMap // User data. Data *cmn.CMap // User data.
} }
// PeerConfig is a Peer configuration
// PeerConfig is a Peer configuration.
type PeerConfig struct { type PeerConfig struct {
AuthEnc bool // authenticated encryption AuthEnc bool // authenticated encryption
@ -45,7 +45,8 @@ type PeerConfig struct {
FuzzConfig *FuzzConnConfig FuzzConfig *FuzzConnConfig
} }
func defaultPeerConfig() *PeerConfig {
// DefaultPeerConfig returns the default config.
func DefaultPeerConfig() *PeerConfig {
return &PeerConfig{ return &PeerConfig{
AuthEnc: true, AuthEnc: true,
Fuzz: false, Fuzz: false,
@ -56,18 +57,17 @@ func defaultPeerConfig() *PeerConfig {
} }
} }
func newPeer(addr *NetAddress, reactorsByCh map[byte]Reactor, chDescs []*ChannelDescriptor, onPeerError func(*Peer, interface{}), ourNodePrivKey crypto.PrivKeyEd25519) (*Peer, error) {
return newPeerWithConfig(addr, reactorsByCh, chDescs, onPeerError, ourNodePrivKey, defaultPeerConfig())
func newOutboundPeer(addr *NetAddress, reactorsByCh map[byte]Reactor, chDescs []*ChannelDescriptor, onPeerError func(*Peer, interface{}), ourNodePrivKey crypto.PrivKeyEd25519) (*Peer, error) {
return newOutboundPeerWithConfig(addr, reactorsByCh, chDescs, onPeerError, ourNodePrivKey, DefaultPeerConfig())
} }
func newPeerWithConfig(addr *NetAddress, reactorsByCh map[byte]Reactor, chDescs []*ChannelDescriptor, onPeerError func(*Peer, interface{}), ourNodePrivKey crypto.PrivKeyEd25519, config *PeerConfig) (*Peer, error) {
func newOutboundPeerWithConfig(addr *NetAddress, reactorsByCh map[byte]Reactor, chDescs []*ChannelDescriptor, onPeerError func(*Peer, interface{}), ourNodePrivKey crypto.PrivKeyEd25519, config *PeerConfig) (*Peer, error) {
conn, err := dial(addr, config) conn, err := dial(addr, config)
if err != nil { if err != nil {
return nil, err return nil, err
} }
// outbound = true
peer, err := newPeerFromExistingConnAndConfig(conn, true, reactorsByCh, chDescs, onPeerError, ourNodePrivKey, config)
peer, err := newPeerFromConnAndConfig(conn, true, reactorsByCh, chDescs, onPeerError, ourNodePrivKey, config)
if err != nil { if err != nil {
conn.Close() conn.Close()
return nil, err return nil, err
@ -75,11 +75,15 @@ func newPeerWithConfig(addr *NetAddress, reactorsByCh map[byte]Reactor, chDescs
return peer, nil return peer, nil
} }
func newPeerFromExistingConn(rawConn net.Conn, outbound bool, reactorsByCh map[byte]Reactor, chDescs []*ChannelDescriptor, onPeerError func(*Peer, interface{}), ourNodePrivKey crypto.PrivKeyEd25519) (*Peer, error) {
return newPeerFromExistingConnAndConfig(rawConn, outbound, reactorsByCh, chDescs, onPeerError, ourNodePrivKey, defaultPeerConfig())
func newInboundPeer(conn net.Conn, reactorsByCh map[byte]Reactor, chDescs []*ChannelDescriptor, onPeerError func(*Peer, interface{}), ourNodePrivKey crypto.PrivKeyEd25519) (*Peer, error) {
return newInboundPeerWithConfig(conn, reactorsByCh, chDescs, onPeerError, ourNodePrivKey, DefaultPeerConfig())
} }
func newPeerFromExistingConnAndConfig(rawConn net.Conn, outbound bool, reactorsByCh map[byte]Reactor, chDescs []*ChannelDescriptor, onPeerError func(*Peer, interface{}), ourNodePrivKey crypto.PrivKeyEd25519, config *PeerConfig) (*Peer, error) {
func newInboundPeerWithConfig(conn net.Conn, reactorsByCh map[byte]Reactor, chDescs []*ChannelDescriptor, onPeerError func(*Peer, interface{}), ourNodePrivKey crypto.PrivKeyEd25519, config *PeerConfig) (*Peer, error) {
return newPeerFromConnAndConfig(conn, false, reactorsByCh, chDescs, onPeerError, ourNodePrivKey, config)
}
func newPeerFromConnAndConfig(rawConn net.Conn, outbound bool, reactorsByCh map[byte]Reactor, chDescs []*ChannelDescriptor, onPeerError func(*Peer, interface{}), ourNodePrivKey crypto.PrivKeyEd25519, config *PeerConfig) (*Peer, error) {
conn := rawConn conn := rawConn
// Fuzz connection // Fuzz connection


+ 2
- 2
peer_test.go View File

@ -35,7 +35,7 @@ func createPeerAndPerformHandshake(addr *NetAddress) (*Peer, error) {
} }
reactorsByCh := map[byte]Reactor{0x01: NewTestReactor(chDescs, true)} reactorsByCh := map[byte]Reactor{0x01: NewTestReactor(chDescs, true)}
pk := crypto.GenPrivKeyEd25519() pk := crypto.GenPrivKeyEd25519()
p, err := newPeer(addr, reactorsByCh, chDescs, func(p *Peer, r interface{}) {}, pk)
p, err := newOutboundPeer(addr, reactorsByCh, chDescs, func(p *Peer, r interface{}) {}, pk)
if err != nil { if err != nil {
return nil, err return nil, err
} }
@ -81,7 +81,7 @@ func (p *remotePeer) accept(l net.Listener) {
if err != nil { if err != nil {
golog.Fatalf("Failed to accept conn: %+v", err) golog.Fatalf("Failed to accept conn: %+v", err)
} }
peer, err := newPeerFromExistingConn(conn, false, make(map[byte]Reactor), make([]*ChannelDescriptor, 0), func(p *Peer, r interface{}) {}, p.PrivKey)
peer, err := newInboundPeer(conn, make(map[byte]Reactor), make([]*ChannelDescriptor, 0), func(p *Peer, r interface{}) {}, p.PrivKey)
if err != nil { if err != nil {
golog.Fatalf("Failed to create a peer: %+v", err) golog.Fatalf("Failed to create a peer: %+v", err)
} }


+ 8
- 9
switch.go View File

@ -317,7 +317,7 @@ func (sw *Switch) DialPeerWithAddress(addr *NetAddress, persistent bool) (*Peer,
sw.dialing.Set(addr.IP.String(), addr) sw.dialing.Set(addr.IP.String(), addr)
defer sw.dialing.Delete(addr.IP.String()) defer sw.dialing.Delete(addr.IP.String())
peer, err := newPeerWithConfig(addr, sw.reactorsByCh, sw.chDescs, sw.StopPeerForError, sw.nodePrivKey, peerConfigFromGoConfig(sw.config))
peer, err := newOutboundPeerWithConfig(addr, sw.reactorsByCh, sw.chDescs, sw.StopPeerForError, sw.nodePrivKey, peerConfigFromGoConfig(sw.config))
if err != nil { if err != nil {
log.Info("Failed dialing peer", "address", addr, "error", err) log.Info("Failed dialing peer", "address", addr, "error", err)
return nil, err return nil, err
@ -436,7 +436,7 @@ func (sw *Switch) listenerRoutine(l Listener) {
} }
// New inbound connection! // New inbound connection!
err := sw.AddPeerWithConnectionAndConfig(inConn, false, peerConfigFromGoConfig(sw.config))
err := sw.addPeerWithConnectionAndConfig(inConn, peerConfigFromGoConfig(sw.config))
if err != nil { if err != nil {
log.Notice("Ignoring inbound connection: error while adding peer", "address", inConn.RemoteAddr().String(), "error", err) log.Notice("Ignoring inbound connection: error while adding peer", "address", inConn.RemoteAddr().String(), "error", err)
continue continue
@ -498,14 +498,14 @@ func Connect2Switches(switches []*Switch, i, j int) {
c1, c2 := net.Pipe() c1, c2 := net.Pipe()
doneCh := make(chan struct{}) doneCh := make(chan struct{})
go func() { go func() {
err := switchI.AddPeerWithConnection(c1, false)
err := switchI.addPeerWithConnection(c1)
if PanicOnAddPeerErr && err != nil { if PanicOnAddPeerErr && err != nil {
panic(err) panic(err)
} }
doneCh <- struct{}{} doneCh <- struct{}{}
}() }()
go func() { go func() {
err := switchJ.AddPeerWithConnection(c2, false)
err := switchJ.addPeerWithConnection(c2)
if PanicOnAddPeerErr && err != nil { if PanicOnAddPeerErr && err != nil {
panic(err) panic(err)
} }
@ -540,9 +540,8 @@ func makeSwitch(i int, network, version string, initSwitch func(int, *Switch) *S
return s return s
} }
// AddPeerWithConnection creates a newPeer from the connection, performs the handshake, and adds it to the switch.
func (sw *Switch) AddPeerWithConnection(conn net.Conn, outbound bool) error {
peer, err := newPeerFromExistingConn(conn, outbound, sw.reactorsByCh, sw.chDescs, sw.StopPeerForError, sw.nodePrivKey)
func (sw *Switch) addPeerWithConnection(conn net.Conn) error {
peer, err := newInboundPeer(conn, sw.reactorsByCh, sw.chDescs, sw.StopPeerForError, sw.nodePrivKey)
if err != nil { if err != nil {
conn.Close() conn.Close()
return err return err
@ -556,8 +555,8 @@ func (sw *Switch) AddPeerWithConnection(conn net.Conn, outbound bool) error {
return nil return nil
} }
func (sw *Switch) AddPeerWithConnectionAndConfig(conn net.Conn, outbound bool, config *PeerConfig) error {
peer, err := newPeerFromExistingConnAndConfig(conn, outbound, sw.reactorsByCh, sw.chDescs, sw.StopPeerForError, sw.nodePrivKey, config)
func (sw *Switch) addPeerWithConnectionAndConfig(conn net.Conn, config *PeerConfig) error {
peer, err := newInboundPeerWithConfig(conn, sw.reactorsByCh, sw.chDescs, sw.StopPeerForError, sw.nodePrivKey, config)
if err != nil { if err != nil {
conn.Close() conn.Close()
return err return err


+ 6
- 6
switch_test.go View File

@ -176,10 +176,10 @@ func TestConnAddrFilter(t *testing.T) {
// connect to good peer // connect to good peer
go func() { go func() {
s1.AddPeerWithConnection(c1, false)
s1.addPeerWithConnection(c1)
}() }()
go func() { go func() {
s2.AddPeerWithConnection(c2, true)
s2.addPeerWithConnection(c2)
}() }()
// Wait for things to happen, peers to get added... // Wait for things to happen, peers to get added...
@ -211,10 +211,10 @@ func TestConnPubKeyFilter(t *testing.T) {
// connect to good peer // connect to good peer
go func() { go func() {
s1.AddPeerWithConnection(c1, false)
s1.addPeerWithConnection(c1)
}() }()
go func() { go func() {
s2.AddPeerWithConnection(c2, true)
s2.addPeerWithConnection(c2)
}() }()
// Wait for things to happen, peers to get added... // Wait for things to happen, peers to get added...
@ -242,7 +242,7 @@ func TestSwitchStopsNonPersistentPeerOnError(t *testing.T) {
rp.Start() rp.Start()
defer rp.Stop() defer rp.Stop()
peer, err := newPeer(rp.RemoteAddr(), sw.reactorsByCh, sw.chDescs, sw.StopPeerForError, sw.nodePrivKey)
peer, err := newOutboundPeer(rp.RemoteAddr(), sw.reactorsByCh, sw.chDescs, sw.StopPeerForError, sw.nodePrivKey)
require.Nil(err) require.Nil(err)
err = sw.AddPeer(peer) err = sw.AddPeer(peer)
require.Nil(err) require.Nil(err)
@ -268,7 +268,7 @@ func TestSwitchReconnectsToPersistentPeer(t *testing.T) {
rp.Start() rp.Start()
defer rp.Stop() defer rp.Stop()
peer, err := newPeer(rp.RemoteAddr(), sw.reactorsByCh, sw.chDescs, sw.StopPeerForError, sw.nodePrivKey)
peer, err := newOutboundPeer(rp.RemoteAddr(), sw.reactorsByCh, sw.chDescs, sw.StopPeerForError, sw.nodePrivKey)
peer.makePersistent() peer.makePersistent()
require.Nil(err) require.Nil(err)
err = sw.AddPeer(peer) err = sw.AddPeer(peer)


Loading…
Cancel
Save