|
|
@ -2,8 +2,8 @@ package p2p |
|
|
|
|
|
|
|
import ( |
|
|
|
"bytes" |
|
|
|
"encoding/hex" |
|
|
|
"io" |
|
|
|
"sync" |
|
|
|
"testing" |
|
|
|
"time" |
|
|
|
|
|
|
@ -17,30 +17,66 @@ func (s String) WriteTo(w io.Writer) (n int64, err error) { |
|
|
|
return |
|
|
|
} |
|
|
|
|
|
|
|
// convenience method for creating two switches connected to each other.
|
|
|
|
func makeSwitchPair(t testing.TB, numChannels int, sendQueueCapacity int, recvBufferSize int, recvQueueCapacity int) (*Switch, *Switch, []*ChannelDescriptor) { |
|
|
|
//-----------------------------------------------------------------------------
|
|
|
|
|
|
|
|
type PeerMessage struct { |
|
|
|
PeerKey string |
|
|
|
Bytes []byte |
|
|
|
Counter int |
|
|
|
} |
|
|
|
|
|
|
|
// Make numChannels channels starting at byte(0x00)
|
|
|
|
chIds := []byte{} |
|
|
|
for i := 0; i < numChannels; i++ { |
|
|
|
chIds = append(chIds, byte(i)) |
|
|
|
type TestReactor struct { |
|
|
|
mtx sync.Mutex |
|
|
|
channels []*ChannelDescriptor |
|
|
|
peersAdded []*Peer |
|
|
|
peersRemoved []*Peer |
|
|
|
logMessages bool |
|
|
|
msgsCounter int |
|
|
|
msgsReceived map[byte][]PeerMessage |
|
|
|
} |
|
|
|
|
|
|
|
func NewTestReactor(channels []*ChannelDescriptor, logMessages bool) *TestReactor { |
|
|
|
return &TestReactor{ |
|
|
|
channels: channels, |
|
|
|
logMessages: logMessages, |
|
|
|
msgsReceived: make(map[byte][]PeerMessage), |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
func (tr *TestReactor) GetChannels() []*ChannelDescriptor { |
|
|
|
return tr.channels |
|
|
|
} |
|
|
|
|
|
|
|
func (tr *TestReactor) AddPeer(peer *Peer) { |
|
|
|
tr.mtx.Lock() |
|
|
|
defer tr.mtx.Unlock() |
|
|
|
tr.peersAdded = append(tr.peersAdded, peer) |
|
|
|
} |
|
|
|
|
|
|
|
// Make some channel descriptors.
|
|
|
|
chDescs := []*ChannelDescriptor{} |
|
|
|
for _, chId := range chIds { |
|
|
|
chDescs = append(chDescs, &ChannelDescriptor{ |
|
|
|
Id: chId, |
|
|
|
SendQueueCapacity: sendQueueCapacity, |
|
|
|
RecvBufferSize: recvBufferSize, |
|
|
|
RecvQueueCapacity: recvQueueCapacity, |
|
|
|
DefaultPriority: 1, |
|
|
|
}) |
|
|
|
func (tr *TestReactor) RemovePeer(peer *Peer, reason interface{}) { |
|
|
|
tr.mtx.Lock() |
|
|
|
defer tr.mtx.Unlock() |
|
|
|
tr.peersRemoved = append(tr.peersRemoved, peer) |
|
|
|
} |
|
|
|
|
|
|
|
func (tr *TestReactor) Receive(chId byte, peer *Peer, msgBytes []byte) { |
|
|
|
if tr.logMessages { |
|
|
|
tr.mtx.Lock() |
|
|
|
defer tr.mtx.Unlock() |
|
|
|
//fmt.Printf("Received: %X, %X\n", chId, msgBytes)
|
|
|
|
tr.msgsReceived[chId] = append(tr.msgsReceived[chId], PeerMessage{peer.Key, msgBytes, tr.msgsCounter}) |
|
|
|
tr.msgsCounter++ |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
//-----------------------------------------------------------------------------
|
|
|
|
|
|
|
|
// convenience method for creating two switches connected to each other.
|
|
|
|
func makeSwitchPair(t testing.TB, reactorsGenerator func() []Reactor) (*Switch, *Switch) { |
|
|
|
|
|
|
|
// Create two switches that will be interconnected.
|
|
|
|
s1 := NewSwitch(chDescs) |
|
|
|
s2 := NewSwitch(chDescs) |
|
|
|
s1 := NewSwitch(reactorsGenerator()) |
|
|
|
s2 := NewSwitch(reactorsGenerator()) |
|
|
|
|
|
|
|
// Create a listener for s1
|
|
|
|
l := NewDefaultListener("tcp", ":8001") |
|
|
@ -67,11 +103,23 @@ func makeSwitchPair(t testing.TB, numChannels int, sendQueueCapacity int, recvBu |
|
|
|
// Close the server, no longer needed.
|
|
|
|
l.Stop() |
|
|
|
|
|
|
|
return s1, s2, chDescs |
|
|
|
return s1, s2 |
|
|
|
} |
|
|
|
|
|
|
|
func TestSwitches(t *testing.T) { |
|
|
|
s1, s2, _ := makeSwitchPair(t, 10, 10, 1024, 10) |
|
|
|
s1, s2 := makeSwitchPair(t, func() []Reactor { |
|
|
|
// Make two reactors of two channels each
|
|
|
|
reactors := make([]Reactor, 2) |
|
|
|
reactors[0] = NewTestReactor([]*ChannelDescriptor{ |
|
|
|
&ChannelDescriptor{Id: byte(0x00), Priority: 10}, |
|
|
|
&ChannelDescriptor{Id: byte(0x01), Priority: 10}, |
|
|
|
}, true) |
|
|
|
reactors[1] = NewTestReactor([]*ChannelDescriptor{ |
|
|
|
&ChannelDescriptor{Id: byte(0x02), Priority: 10}, |
|
|
|
&ChannelDescriptor{Id: byte(0x03), Priority: 10}, |
|
|
|
}, true) |
|
|
|
return reactors |
|
|
|
}) |
|
|
|
defer s1.Stop() |
|
|
|
defer s2.Stop() |
|
|
|
|
|
|
@ -83,61 +131,66 @@ func TestSwitches(t *testing.T) { |
|
|
|
t.Errorf("Expected exactly 1 peer in s2, got %v", s2.Peers().Size()) |
|
|
|
} |
|
|
|
|
|
|
|
// Broadcast a message on ch0
|
|
|
|
s1.Broadcast(byte(0x00), String("channel zero")) |
|
|
|
// Broadcast a message on ch1
|
|
|
|
s1.Broadcast(byte(0x01), String("channel one")) |
|
|
|
// Broadcast a message on ch2
|
|
|
|
s1.Broadcast(byte(0x02), String("channel two")) |
|
|
|
ch0Msg := String("channel zero") |
|
|
|
ch1Msg := String("channel one") |
|
|
|
ch2Msg := String("channel two") |
|
|
|
|
|
|
|
s1.Broadcast(byte(0x00), ch0Msg) |
|
|
|
s1.Broadcast(byte(0x01), ch1Msg) |
|
|
|
s1.Broadcast(byte(0x02), ch2Msg) |
|
|
|
|
|
|
|
// Wait for things to settle...
|
|
|
|
time.Sleep(100 * time.Millisecond) |
|
|
|
|
|
|
|
// Receive message from channel 1 and check
|
|
|
|
inMsg, ok := s2.Receive(byte(0x01)) |
|
|
|
var n int64 |
|
|
|
var err error |
|
|
|
if !ok { |
|
|
|
t.Errorf("Failed to receive from channel one") |
|
|
|
// Check message on ch0
|
|
|
|
ch0Msgs := s2.Reactors()[0].(*TestReactor).msgsReceived[byte(0x00)] |
|
|
|
if len(ch0Msgs) != 1 { |
|
|
|
t.Errorf("Expected to have received 1 message in ch0") |
|
|
|
} |
|
|
|
if ReadString(bytes.NewBuffer(inMsg.Bytes), &n, &err) != "channel one" { |
|
|
|
t.Errorf("Unexpected received message bytes:\n%v", hex.Dump(inMsg.Bytes)) |
|
|
|
if !bytes.Equal(ch0Msgs[0].Bytes, BinaryBytes(ch0Msg)) { |
|
|
|
t.Errorf("Unexpected message bytes. Wanted: %X, Got: %X", BinaryBytes(ch0Msg), ch0Msgs[0].Bytes) |
|
|
|
} |
|
|
|
|
|
|
|
// Receive message from channel 0 and check
|
|
|
|
inMsg, ok = s2.Receive(byte(0x00)) |
|
|
|
if !ok { |
|
|
|
t.Errorf("Failed to receive from channel zero") |
|
|
|
// Check message on ch1
|
|
|
|
ch1Msgs := s2.Reactors()[0].(*TestReactor).msgsReceived[byte(0x01)] |
|
|
|
if len(ch1Msgs) != 1 { |
|
|
|
t.Errorf("Expected to have received 1 message in ch1") |
|
|
|
} |
|
|
|
if !bytes.Equal(ch1Msgs[0].Bytes, BinaryBytes(ch1Msg)) { |
|
|
|
t.Errorf("Unexpected message bytes. Wanted: %X, Got: %X", BinaryBytes(ch1Msg), ch1Msgs[0].Bytes) |
|
|
|
} |
|
|
|
if ReadString(bytes.NewBuffer(inMsg.Bytes), &n, &err) != "channel zero" { |
|
|
|
t.Errorf("Unexpected received message bytes:\n%v", hex.Dump(inMsg.Bytes)) |
|
|
|
|
|
|
|
// Check message on ch2
|
|
|
|
ch2Msgs := s2.Reactors()[1].(*TestReactor).msgsReceived[byte(0x02)] |
|
|
|
if len(ch2Msgs) != 1 { |
|
|
|
t.Errorf("Expected to have received 1 message in ch2") |
|
|
|
} |
|
|
|
if !bytes.Equal(ch2Msgs[0].Bytes, BinaryBytes(ch2Msg)) { |
|
|
|
t.Errorf("Unexpected message bytes. Wanted: %X, Got: %X", BinaryBytes(ch2Msg), ch2Msgs[0].Bytes) |
|
|
|
} |
|
|
|
|
|
|
|
} |
|
|
|
|
|
|
|
func BenchmarkSwitches(b *testing.B) { |
|
|
|
|
|
|
|
b.StopTimer() |
|
|
|
|
|
|
|
s1, s2, chDescs := makeSwitchPair(b, 10, 10, 1024, 10) |
|
|
|
s1, s2 := makeSwitchPair(b, func() []Reactor { |
|
|
|
// Make two reactors of two channels each
|
|
|
|
reactors := make([]Reactor, 2) |
|
|
|
reactors[0] = NewTestReactor([]*ChannelDescriptor{ |
|
|
|
&ChannelDescriptor{Id: byte(0x00), Priority: 10}, |
|
|
|
&ChannelDescriptor{Id: byte(0x01), Priority: 10}, |
|
|
|
}, false) |
|
|
|
reactors[1] = NewTestReactor([]*ChannelDescriptor{ |
|
|
|
&ChannelDescriptor{Id: byte(0x02), Priority: 10}, |
|
|
|
&ChannelDescriptor{Id: byte(0x03), Priority: 10}, |
|
|
|
}, false) |
|
|
|
return reactors |
|
|
|
}) |
|
|
|
defer s1.Stop() |
|
|
|
defer s2.Stop() |
|
|
|
|
|
|
|
// Create a sink on either channel to just pop off messages.
|
|
|
|
recvRoutine := func(c *Switch, chId byte) { |
|
|
|
for { |
|
|
|
_, ok := c.Receive(chId) |
|
|
|
if !ok { |
|
|
|
break |
|
|
|
} |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
// Create routines to consume from recvQueues.
|
|
|
|
for _, chDesc := range chDescs { |
|
|
|
go recvRoutine(s1, chDesc.Id) |
|
|
|
go recvRoutine(s2, chDesc.Id) |
|
|
|
} |
|
|
|
|
|
|
|
// Allow time for goroutines to boot up
|
|
|
|
time.Sleep(1000 * time.Millisecond) |
|
|
|
b.StartTimer() |
|
|
@ -146,7 +199,7 @@ func BenchmarkSwitches(b *testing.B) { |
|
|
|
|
|
|
|
// Send random message from one channel to another
|
|
|
|
for i := 0; i < b.N; i++ { |
|
|
|
chId := chDescs[i%len(chDescs)].Id |
|
|
|
chId := byte(i % 4) |
|
|
|
nS, nF := s1.Broadcast(chId, String("test data")) |
|
|
|
numSuccess += nS |
|
|
|
numFailure += nF |
|
|
|