Browse Source

benchmark works, but could use some improvement.

~33k packets/sec for a single local pair.
pull/9/head
Jae Kwon 11 years ago
parent
commit
3e2f377863
11 changed files with 103 additions and 68 deletions
  1. +12
    -0
      common/random.go
  2. +8
    -15
      merkle/iavl_test.go
  3. +4
    -0
      merkle/iavl_tree.go
  4. +1
    -15
      merkle/util.go
  5. +15
    -13
      peer/client.go
  6. +42
    -15
      peer/client_test.go
  7. +6
    -3
      peer/connection.go
  8. +11
    -2
      peer/listener.go
  9. +1
    -1
      peer/log.go
  10. +2
    -1
      peer/peer.go
  11. +1
    -3
      peer/server.go

+ 12
- 0
common/random.go View File

@ -0,0 +1,12 @@
package common
import "crypto/rand"
func RandStr(length int) string {
b := make([]byte, length)
_, err := rand.Read(b)
if err != nil {
return ""
}
return string(b)
}

+ 8
- 15
merkle/iavl_test.go View File

@ -3,29 +3,22 @@ package merkle
import ( import (
"bytes" "bytes"
"crypto/sha256" "crypto/sha256"
"encoding/binary"
"fmt" "fmt"
. "github.com/tendermint/tendermint/binary" . "github.com/tendermint/tendermint/binary"
. "github.com/tendermint/tendermint/common"
"github.com/tendermint/tendermint/db" "github.com/tendermint/tendermint/db"
"math/rand"
"os"
"runtime" "runtime"
"testing" "testing"
) )
func init() { func init() {
if urandom, err := os.Open("/dev/urandom"); err != nil {
return
} else {
buf := make([]byte, 8)
if _, err := urandom.Read(buf); err == nil {
buf_reader := bytes.NewReader(buf)
if seed, err := binary.ReadVarint(buf_reader); err == nil {
rand.Seed(seed)
}
}
urandom.Close()
}
// TODO: seed rand?
}
func randstr(length int) String {
return String(RandStr(length))
} }
func TestUnit(t *testing.T) { func TestUnit(t *testing.T) {


+ 4
- 0
merkle/iavl_tree.go View File

@ -117,6 +117,10 @@ func (t *IAVLTree) Traverse(cb func(Node) bool) {
func (t *IAVLTree) Values() <-chan Value { func (t *IAVLTree) Values() <-chan Value {
root := t.root root := t.root
ch := make(chan Value) ch := make(chan Value)
if root == nil {
close(ch)
return ch
}
go func() { go func() {
root.traverse(t.db, func(n Node) bool { root.traverse(t.db, func(n Node) bool {
if n.Height() == 0 { if n.Height() == 0 {


+ 1
- 15
merkle/util.go View File

@ -3,8 +3,8 @@ package merkle
import ( import (
"crypto/sha256" "crypto/sha256"
"fmt" "fmt"
. "github.com/tendermint/tendermint/binary" . "github.com/tendermint/tendermint/binary"
"os"
) )
/* /*
@ -61,20 +61,6 @@ func printIAVLNode(node *IAVLNode, indent int) {
} }
func randstr(length int) String {
if urandom, err := os.Open("/dev/urandom"); err != nil {
panic(err)
} else {
slice := make([]byte, length)
if _, err := urandom.Read(slice); err != nil {
panic(err)
}
urandom.Close()
return String(slice)
}
panic("unreachable")
}
func maxUint8(a, b uint8) uint8 { func maxUint8(a, b uint8) uint8 {
if a > b { if a > b {
return a return a


+ 15
- 13
peer/client.go View File

@ -2,11 +2,12 @@ package peer
import ( import (
"errors" "errors"
"sync"
"sync/atomic"
. "github.com/tendermint/tendermint/binary" . "github.com/tendermint/tendermint/binary"
. "github.com/tendermint/tendermint/common" . "github.com/tendermint/tendermint/common"
"github.com/tendermint/tendermint/merkle" "github.com/tendermint/tendermint/merkle"
"sync"
"sync/atomic"
) )
/* Client /* Client
@ -99,20 +100,23 @@ func (c *Client) AddPeerWithConnection(conn *Connection, outgoing bool) (*Peer,
return peer, nil return peer, nil
} }
func (c *Client) Broadcast(pkt Packet) {
func (c *Client) Broadcast(pkt Packet) (numSuccess, numFailure int) {
if atomic.LoadUint32(&c.stopped) == 1 { if atomic.LoadUint32(&c.stopped) == 1 {
return return
} }
log.Tracef("Broadcast on [%v] len: %v", pkt.Channel, len(pkt.Bytes)) log.Tracef("Broadcast on [%v] len: %v", pkt.Channel, len(pkt.Bytes))
for v := range c.Peers().Values() {
for v := range c.peers.Values() {
peer := v.(*Peer) peer := v.(*Peer)
success := peer.TrySend(pkt) success := peer.TrySend(pkt)
log.Tracef("Broadcast for peer %v success: %v", peer, success) log.Tracef("Broadcast for peer %v success: %v", peer, success)
if !success {
// TODO: notify the peer
if success {
numSuccess += 1
} else {
numFailure += 1
} }
} }
return
} }
@ -128,13 +132,11 @@ func (c *Client) Receive(chName String) *InboundPacket {
Panicf("Expected recvQueues[%f], found none", chName) Panicf("Expected recvQueues[%f], found none", chName)
} }
for {
select {
case <-c.quit:
return nil
case inPacket := <-q:
return inPacket
}
select {
case <-c.quit:
return nil
case inPacket := <-q:
return inPacket
} }
} }


+ 42
- 15
peer/client_test.go View File

@ -1,19 +1,20 @@
package peer package peer
import ( import (
. "github.com/tendermint/tendermint/binary"
"testing" "testing"
"time" "time"
. "github.com/tendermint/tendermint/binary"
) )
// convenience method for creating two clients connected to each other. // convenience method for creating two clients connected to each other.
func makeClientPair(t *testing.T, bufferSize int, channels []string) (*Client, *Client) {
func makeClientPair(t testing.TB, bufferSize int, channels []String) (*Client, *Client) {
peerMaker := func(conn *Connection) *Peer { peerMaker := func(conn *Connection) *Peer {
p := NewPeer(conn) p := NewPeer(conn)
p.channels = map[String]*Channel{} p.channels = map[String]*Channel{}
for chName := range channels {
p.channels[String(chName)] = NewChannel(String(chName), bufferSize)
for _, chName := range channels {
p.channels[chName] = NewChannel(chName, bufferSize)
} }
return p return p
} }
@ -39,12 +40,18 @@ func makeClientPair(t *testing.T, bufferSize int, channels []string) (*Client, *
// Wait for things to happen, peers to get added... // Wait for things to happen, peers to get added...
time.Sleep(100 * time.Millisecond) time.Sleep(100 * time.Millisecond)
// Close the server, no longer needed.
s1.Stop()
return c1, c2 return c1, c2
} }
func TestClients(t *testing.T) { func TestClients(t *testing.T) {
c1, c2 := makeClientPair(t, 10, []string{"ch1", "ch2", "ch3"})
channels := []String{"ch1", "ch2", "ch3"}
c1, c2 := makeClientPair(t, 10, channels)
defer c1.Stop()
defer c2.Stop()
// Lets send a message from c1 to c2. // Lets send a message from c1 to c2.
if c1.Peers().Size() != 1 { if c1.Peers().Size() != 1 {
@ -75,31 +82,51 @@ func TestClients(t *testing.T) {
if string(inMsg.Bytes) != "channel one" { if string(inMsg.Bytes) != "channel one" {
t.Errorf("Unexpected received message bytes: %v", string(inMsg.Bytes)) t.Errorf("Unexpected received message bytes: %v", string(inMsg.Bytes))
} }
s1.Stop()
c2.Stop()
} }
func BenchmarkClients(b *testing.B) { func BenchmarkClients(b *testing.B) {
b.StopTimer() b.StopTimer()
// TODO: benchmark the random functions, which is faster?
c1, c2 := makeClientPair(t, 10, []string{"ch1", "ch2", "ch3"})
channels := []String{"ch1", "ch2", "ch3"}
c1, c2 := makeClientPair(b, 10, channels)
defer c1.Stop()
defer c2.Stop()
// Create a sink on either channel to just pop off messages. // Create a sink on either channel to just pop off messages.
// TODO: ensure that when clients stop, this goroutine stops.
recvHandler := func(c *Client) {
recvHandler := func(c *Client, chName String) {
for {
it := c.Receive(chName)
if it == nil {
break
}
}
} }
go recvHandler(c1)
go recvHandler(c2)
for _, chName := range channels {
go recvHandler(c1, chName)
go recvHandler(c2, chName)
}
// Allow time for goroutines to boot up
time.Sleep(1000 * time.Millisecond)
b.StartTimer() b.StartTimer()
numSuccess, numFailure := 0, 0
// Send random message from one channel to another // Send random message from one channel to another
for i := 0; i < b.N; i++ { for i := 0; i < b.N; i++ {
chName := channels[i%len(channels)]
pkt := NewPacket(chName, ByteSlice("test data"))
nS, nF := c1.Broadcast(pkt)
numSuccess += nS
numFailure += nF
} }
log.Warnf("success: %v, failure: %v", numSuccess, numFailure)
// Allow everything to flush before stopping clients & closing connections.
b.StopTimer()
time.Sleep(1000 * time.Millisecond)
} }

+ 6
- 3
peer/connection.go View File

@ -2,11 +2,12 @@ package peer
import ( import (
"fmt" "fmt"
. "github.com/tendermint/tendermint/binary"
. "github.com/tendermint/tendermint/common"
"net" "net"
"sync/atomic" "sync/atomic"
"time" "time"
. "github.com/tendermint/tendermint/binary"
. "github.com/tendermint/tendermint/common"
) )
const ( const (
@ -115,12 +116,14 @@ FOR_LOOP:
break FOR_LOOP break FOR_LOOP
} }
if atomic.LoadUint32(&c.stopped) == 1 {
break FOR_LOOP
}
if err != nil { if err != nil {
log.Infof("%v failed @ sendHandler:\n%v", c, err) log.Infof("%v failed @ sendHandler:\n%v", c, err)
c.Stop() c.Stop()
break FOR_LOOP break FOR_LOOP
} }
c.flush() c.flush()
} }


+ 11
- 2
peer/listener.go View File

@ -1,9 +1,10 @@
package peer package peer
import ( import (
. "github.com/tendermint/tendermint/common"
"net" "net"
"sync/atomic" "sync/atomic"
. "github.com/tendermint/tendermint/common"
) )
const ( const (
@ -51,7 +52,7 @@ func (l *DefaultListener) listenHandler() {
conn, err := l.listener.Accept() conn, err := l.listener.Accept()
if atomic.LoadUint32(&l.stopped) == 1 { if atomic.LoadUint32(&l.stopped) == 1 {
return
break // go to cleanup
} }
// listener wasn't stopped, // listener wasn't stopped,
@ -104,21 +105,29 @@ func GetLocalAddress() *NetAddress {
// UPNP external address discovery & port mapping // UPNP external address discovery & port mapping
// TODO: more flexible internal & external ports // TODO: more flexible internal & external ports
func GetUPNPLocalAddress() *NetAddress { func GetUPNPLocalAddress() *NetAddress {
// XXX remove nil, create option for specifying address.
// removed because this takes too long.
return nil
log.Infof("Getting UPNP local address")
nat, err := Discover() nat, err := Discover()
if err != nil { if err != nil {
log.Infof("Could not get UPNP local address: %v", err)
return nil return nil
} }
ext, err := nat.GetExternalAddress() ext, err := nat.GetExternalAddress()
if err != nil { if err != nil {
log.Infof("Could not get UPNP local address: %v", err)
return nil return nil
} }
_, err = nat.AddPortMapping("tcp", DEFAULT_PORT, DEFAULT_PORT, "tendermint", 0) _, err = nat.AddPortMapping("tcp", DEFAULT_PORT, DEFAULT_PORT, "tendermint", 0)
if err != nil { if err != nil {
log.Infof("Could not get UPNP local address: %v", err)
return nil return nil
} }
log.Infof("Got UPNP local address: %v", ext)
return NewNetAddressIPPort(ext, DEFAULT_PORT) return NewNetAddressIPPort(ext, DEFAULT_PORT)
} }


+ 1
- 1
peer/log.go View File

@ -9,7 +9,7 @@ var log seelog.LoggerInterface
func init() { func init() {
// TODO: replace with configuration file in the ~/.tendermint directory. // TODO: replace with configuration file in the ~/.tendermint directory.
config := ` config := `
<seelog type="sync">
<seelog type="asyncloop" minlevel="debug">
<outputs formatid="colored"> <outputs formatid="colored">
<console/> <console/>
</outputs> </outputs>


+ 2
- 1
peer/peer.go View File

@ -2,11 +2,12 @@ package peer
import ( import (
"fmt" "fmt"
. "github.com/tendermint/tendermint/binary"
"io" "io"
"sync" "sync"
"sync/atomic" "sync/atomic"
"time" "time"
. "github.com/tendermint/tendermint/binary"
) )
/* Peer */ /* Peer */


+ 1
- 3
peer/server.go View File

@ -1,7 +1,5 @@
package peer package peer
import ()
/* Server */ /* Server */
type Server struct { type Server struct {
@ -31,8 +29,8 @@ func (s *Server) IncomingConnectionHandler() {
} }
} }
// stops the server, not the client.
func (s *Server) Stop() { func (s *Server) Stop() {
log.Infof("Stopping server") log.Infof("Stopping server")
s.listener.Stop() s.listener.Stop()
s.client.Stop()
} }

Loading…
Cancel
Save