Browse Source

Merge pull request #6 from tendermint/develop

filter conn by addr/pubkey. closes #3
pull/456/head
Ethan Buchman 8 years ago
committed by GitHub
parent
commit
f508f3f20b
3 changed files with 116 additions and 14 deletions
  1. +37
    -0
      switch.go
  2. +78
    -13
      switch_test.go
  3. +1
    -1
      version.go

+ 37
- 0
switch.go View File

@ -65,6 +65,9 @@ type Switch struct {
dialing *CMap dialing *CMap
nodeInfo *NodeInfo // our node info nodeInfo *NodeInfo // our node info
nodePrivKey crypto.PrivKeyEd25519 // our node privkey nodePrivKey crypto.PrivKeyEd25519 // our node privkey
filterConnByAddr func(net.Addr) error
filterConnByPubKey func(crypto.PubKeyEd25519) error
} }
var ( var (
@ -192,6 +195,11 @@ func (sw *Switch) OnStop() {
// NOTE: This performs a blocking handshake before the peer is added. // NOTE: This performs a blocking handshake before the peer is added.
// CONTRACT: Iff error is returned, peer is nil, and conn is immediately closed. // CONTRACT: Iff error is returned, peer is nil, and conn is immediately closed.
func (sw *Switch) AddPeerWithConnection(conn net.Conn, outbound bool) (*Peer, error) { func (sw *Switch) AddPeerWithConnection(conn net.Conn, outbound bool) (*Peer, error) {
// Filter by ip
if err := sw.FilterConnByAddr(conn.RemoteAddr()); err != nil {
return nil, err
}
// Set deadline for handshake so we don't block forever on conn.ReadFull // Set deadline for handshake so we don't block forever on conn.ReadFull
conn.SetDeadline(time.Now().Add( conn.SetDeadline(time.Now().Add(
time.Duration(sw.config.GetInt(configKeyHandshakeTimeoutSeconds)) * time.Second)) time.Duration(sw.config.GetInt(configKeyHandshakeTimeoutSeconds)) * time.Second))
@ -206,6 +214,12 @@ func (sw *Switch) AddPeerWithConnection(conn net.Conn, outbound bool) (*Peer, er
return nil, err return nil, err
} }
} }
// Filter by p2p-key
if err := sw.FilterConnByPubKey(sconn.(*SecretConnection).RemotePubKey()); err != nil {
return nil, err
}
// Then, perform node handshake // Then, perform node handshake
peerNodeInfo, err := peerHandshake(sconn, sw.nodeInfo) peerNodeInfo, err := peerHandshake(sconn, sw.nodeInfo)
if err != nil { if err != nil {
@ -251,6 +265,29 @@ func (sw *Switch) AddPeerWithConnection(conn net.Conn, outbound bool) (*Peer, er
return peer, nil return peer, nil
} }
func (sw *Switch) FilterConnByAddr(addr net.Addr) error {
if sw.filterConnByAddr != nil {
return sw.filterConnByAddr(addr)
}
return nil
}
func (sw *Switch) FilterConnByPubKey(pubkey crypto.PubKeyEd25519) error {
if sw.filterConnByPubKey != nil {
return sw.filterConnByPubKey(pubkey)
}
return nil
}
func (sw *Switch) SetAddrFilter(f func(net.Addr) error) {
sw.filterConnByAddr = f
}
func (sw *Switch) SetPubKeyFilter(f func(crypto.PubKeyEd25519) error) {
sw.filterConnByPubKey = f
}
func (sw *Switch) startInitPeer(peer *Peer) { func (sw *Switch) startInitPeer(peer *Peer) {
peer.Start() // spawn send/recv routines peer.Start() // spawn send/recv routines
sw.addPeerToReactors(peer) // run AddPeer on each reactor sw.addPeerToReactors(peer) // run AddPeer on each reactor


+ 78
- 13
switch_test.go View File

@ -2,6 +2,7 @@ package p2p
import ( import (
"bytes" "bytes"
"fmt"
"net" "net"
"sync" "sync"
"testing" "testing"
@ -9,6 +10,7 @@ import (
. "github.com/tendermint/go-common" . "github.com/tendermint/go-common"
cfg "github.com/tendermint/go-config" cfg "github.com/tendermint/go-config"
"github.com/tendermint/go-crypto"
"github.com/tendermint/go-wire" "github.com/tendermint/go-wire"
) )
@ -92,23 +94,24 @@ func makeSwitchPair(t testing.TB, initSwitch func(int, *Switch) *Switch) (*Switc
return switches[0], switches[1] return switches[0], switches[1]
} }
func initSwitchFunc(i int, sw *Switch) *Switch {
// Make two reactors of two channels each
sw.AddReactor("foo", NewTestReactor([]*ChannelDescriptor{
&ChannelDescriptor{ID: byte(0x00), Priority: 10},
&ChannelDescriptor{ID: byte(0x01), Priority: 10},
}, true))
sw.AddReactor("bar", NewTestReactor([]*ChannelDescriptor{
&ChannelDescriptor{ID: byte(0x02), Priority: 10},
&ChannelDescriptor{ID: byte(0x03), Priority: 10},
}, true))
return sw
}
func TestSwitches(t *testing.T) { func TestSwitches(t *testing.T) {
s1, s2 := makeSwitchPair(t, func(i int, sw *Switch) *Switch {
// Make two reactors of two channels each
sw.AddReactor("foo", NewTestReactor([]*ChannelDescriptor{
&ChannelDescriptor{ID: byte(0x00), Priority: 10},
&ChannelDescriptor{ID: byte(0x01), Priority: 10},
}, true))
sw.AddReactor("bar", NewTestReactor([]*ChannelDescriptor{
&ChannelDescriptor{ID: byte(0x02), Priority: 10},
&ChannelDescriptor{ID: byte(0x03), Priority: 10},
}, true))
return sw
})
s1, s2 := makeSwitchPair(t, initSwitchFunc)
defer s1.Stop() defer s1.Stop()
defer s2.Stop() defer s2.Stop()
// Lets send a message from s1 to s2.
if s1.Peers().Size() != 1 { if s1.Peers().Size() != 1 {
t.Errorf("Expected exactly 1 peer in s1, got %v", s1.Peers().Size()) t.Errorf("Expected exactly 1 peer in s1, got %v", s1.Peers().Size())
} }
@ -116,6 +119,7 @@ func TestSwitches(t *testing.T) {
t.Errorf("Expected exactly 1 peer in s2, got %v", s2.Peers().Size()) t.Errorf("Expected exactly 1 peer in s2, got %v", s2.Peers().Size())
} }
// Lets send some messages
ch0Msg := "channel zero" ch0Msg := "channel zero"
ch1Msg := "channel foo" ch1Msg := "channel foo"
ch2Msg := "channel bar" ch2Msg := "channel bar"
@ -156,6 +160,67 @@ func TestSwitches(t *testing.T) {
} }
func TestConnAddrFilter(t *testing.T) {
s1 := makeSwitch(1, "testing", "123.123.123", initSwitchFunc)
s2 := makeSwitch(1, "testing", "123.123.123", initSwitchFunc)
c1, c2 := net.Pipe()
s1.SetAddrFilter(func(addr net.Addr) error {
if addr.String() == c1.RemoteAddr().String() {
return fmt.Errorf("Error: pipe is blacklisted")
}
return nil
})
// connect to good peer
go s1.AddPeerWithConnection(c1, false) // AddPeer is blocking, requires handshake.
go s2.AddPeerWithConnection(c2, true)
// Wait for things to happen, peers to get added...
time.Sleep(100 * time.Millisecond * time.Duration(4))
defer s1.Stop()
defer s2.Stop()
if s1.Peers().Size() != 0 {
t.Errorf("Expected s1 not to connect to peers, got %d", s1.Peers().Size())
}
if s2.Peers().Size() != 0 {
t.Errorf("Expected s2 not to connect to peers, got %d", s2.Peers().Size())
}
}
func TestConnPubKeyFilter(t *testing.T) {
s1 := makeSwitch(1, "testing", "123.123.123", initSwitchFunc)
s2 := makeSwitch(1, "testing", "123.123.123", initSwitchFunc)
c1, c2 := net.Pipe()
// set pubkey filter
s1.SetPubKeyFilter(func(pubkey crypto.PubKeyEd25519) error {
if bytes.Equal(pubkey.Bytes(), s2.nodeInfo.PubKey.Bytes()) {
return fmt.Errorf("Error: pipe is blacklisted")
}
return nil
})
// connect to good peer
go s1.AddPeerWithConnection(c1, false) // AddPeer is blocking, requires handshake.
go s2.AddPeerWithConnection(c2, true)
// Wait for things to happen, peers to get added...
time.Sleep(100 * time.Millisecond * time.Duration(4))
defer s1.Stop()
defer s2.Stop()
if s1.Peers().Size() != 0 {
t.Errorf("Expected s1 not to connect to peers, got %d", s1.Peers().Size())
}
if s2.Peers().Size() != 0 {
t.Errorf("Expected s2 not to connect to peers, got %d", s2.Peers().Size())
}
}
func BenchmarkSwitches(b *testing.B) { func BenchmarkSwitches(b *testing.B) {
b.StopTimer() b.StopTimer()


+ 1
- 1
version.go View File

@ -1,3 +1,3 @@
package p2p package p2p
const Version = "0.3.3" // fuzz conn
const Version = "0.3.4" // filter by addr or pubkey

Loading…
Cancel
Save