Browse Source

[tm-bench] use plain ws connection instead of go-rpc (Refs #9, Fixes #14)

Also:
- use go-kit/log

Reasoning:
I found that we just don't need this layer of abstraction here. Plus
go-rpc is buggy and lacking proper closing of ws connections.
pull/1943/head
Anton Kaliaev 8 years ago
parent
commit
08166b05f5
No known key found for this signature in database GPG Key ID: 7B6881D965918214
3 changed files with 136 additions and 35 deletions
  1. +4
    -0
      tm-bench/glide.yaml
  2. +25
    -2
      tm-bench/main.go
  3. +107
    -33
      tm-bench/transacter.go

+ 4
- 0
tm-bench/glide.yaml View File

@ -8,3 +8,7 @@ import:
- package: github.com/tendermint/tools - package: github.com/tendermint/tools
subpackages: subpackages:
- tm-monitor/monitor - tm-monitor/monitor
- package: github.com/go-kit/kit
subpackages:
- log
- term

+ 25
- 2
tm-bench/main.go View File

@ -8,6 +8,8 @@ import (
"text/tabwriter" "text/tabwriter"
"time" "time"
"github.com/go-kit/kit/log"
"github.com/go-kit/kit/log/term"
metrics "github.com/rcrowley/go-metrics" metrics "github.com/rcrowley/go-metrics"
tmtypes "github.com/tendermint/tendermint/types" tmtypes "github.com/tendermint/tendermint/types"
"github.com/tendermint/tools/tm-monitor/monitor" "github.com/tendermint/tools/tm-monitor/monitor"
@ -15,6 +17,8 @@ import (
var version = "0.1.0.pre" var version = "0.1.0.pre"
var logger = log.NewNopLogger()
type statistics struct { type statistics struct {
BlockTimeSample metrics.Histogram BlockTimeSample metrics.Histogram
TxThroughputSample metrics.Histogram TxThroughputSample metrics.Histogram
@ -23,10 +27,12 @@ type statistics struct {
func main() { func main() {
var duration, txsRate, connections int var duration, txsRate, connections int
var verbose bool
flag.IntVar(&connections, "c", 1, "Connections to keep open per endpoint") flag.IntVar(&connections, "c", 1, "Connections to keep open per endpoint")
flag.IntVar(&duration, "T", 10, "Exit after the specified amount of time in seconds") flag.IntVar(&duration, "T", 10, "Exit after the specified amount of time in seconds")
flag.IntVar(&txsRate, "r", 1000, "Txs per second to send in a connection") flag.IntVar(&txsRate, "r", 1000, "Txs per second to send in a connection")
flag.BoolVar(&verbose, "v", false, "Verbose output")
flag.Usage = func() { flag.Usage = func() {
fmt.Println(`Tendermint blockchain benchmarking tool. fmt.Println(`Tendermint blockchain benchmarking tool.
@ -47,6 +53,19 @@ Examples:
os.Exit(1) os.Exit(1)
} }
if verbose {
// Color errors red
colorFn := func(keyvals ...interface{}) term.FgBgColor {
for i := 1; i < len(keyvals); i += 2 {
if _, ok := keyvals[i].(error); ok {
return term.FgBgColor{Fg: term.White, Bg: term.Red}
}
}
return term.FgBgColor{}
}
logger = term.NewLogger(os.Stdout, log.NewLogfmtLogger, colorFn)
}
fmt.Printf("Running %ds test @ %s\n", duration, flag.Arg(0)) fmt.Printf("Running %ds test @ %s\n", duration, flag.Arg(0))
endpoints := strings.Split(flag.Arg(0), ",") endpoints := strings.Split(flag.Arg(0), ",")
@ -104,10 +123,12 @@ func startNodes(endpoints []string, blockCh chan<- tmtypes.Header, blockLatencyC
for i, e := range endpoints { for i, e := range endpoints {
n := monitor.NewNode(e) n := monitor.NewNode(e)
n.SetLogger(log.With(logger, "node", e))
n.SendBlocksTo(blockCh) n.SendBlocksTo(blockCh)
n.SendBlockLatenciesTo(blockLatencyCh) n.SendBlockLatenciesTo(blockLatencyCh)
if err := n.Start(); err != nil { if err := n.Start(); err != nil {
panic(err)
fmt.Println(err)
os.Exit(1)
} }
nodes[i] = n nodes[i] = n
} }
@ -120,8 +141,10 @@ func startTransacters(endpoints []string, connections int, txsRate int) []*trans
for i, e := range endpoints { for i, e := range endpoints {
t := newTransacter(e, connections, txsRate) t := newTransacter(e, connections, txsRate)
t.SetLogger(logger)
if err := t.Start(); err != nil { if err := t.Start(); err != nil {
panic(err)
fmt.Println(err)
os.Exit(1)
} }
transacters[i] = t transacters[i] = t
} }


+ 107
- 33
tm-bench/transacter.go View File

@ -5,103 +5,177 @@ import (
"encoding/hex" "encoding/hex"
"fmt" "fmt"
"math/rand" "math/rand"
"net/http"
"net/url"
"os" "os"
"sync" "sync"
"time" "time"
"github.com/go-kit/kit/log"
"github.com/gorilla/websocket"
"github.com/pkg/errors" "github.com/pkg/errors"
rpcclient "github.com/tendermint/go-rpc/client"
rpctypes "github.com/tendermint/go-rpc/types" rpctypes "github.com/tendermint/go-rpc/types"
) )
const (
sendTimeout = 500 * time.Millisecond
// see https://github.com/tendermint/go-rpc/blob/develop/server/handlers.go#L313
pingPeriod = (30 * 9 / 10) * time.Second
)
type transacter struct { type transacter struct {
Target string Target string
Rate int Rate int
Connections int Connections int
conns []*rpcclient.WSClient
conns []*websocket.Conn
wg sync.WaitGroup wg sync.WaitGroup
stopped bool stopped bool
logger log.Logger
} }
func newTransacter(target string, connections int, rate int) *transacter { func newTransacter(target string, connections int, rate int) *transacter {
conns := make([]*rpcclient.WSClient, connections)
for i := 0; i < connections; i++ {
conns[i] = rpcclient.NewWSClient(target, "/websocket")
}
return &transacter{ return &transacter{
Target: target, Target: target,
Rate: rate, Rate: rate,
Connections: connections, Connections: connections,
conns: conns,
conns: make([]*websocket.Conn, connections),
logger: log.NewNopLogger(),
} }
} }
// SetLogger lets you set your own logger
func (t *transacter) SetLogger(l log.Logger) {
t.logger = l
}
// Start opens N = `t.Connections` connections to the target and creates read
// and write goroutines for each connection.
func (t *transacter) Start() error { func (t *transacter) Start() error {
t.stopped = false t.stopped = false
for _, c := range t.conns {
if _, err := c.Start(); err != nil {
for i := 0; i < t.Connections; i++ {
c, _, err := connect(t.Target)
if err != nil {
return err return err
} }
t.conns[i] = c
} }
t.wg.Add(t.Connections)
t.wg.Add(2 * t.Connections)
for i := 0; i < t.Connections; i++ { for i := 0; i < t.Connections; i++ {
go t.sendLoop(i) go t.sendLoop(i)
go t.receiveLoop(i)
} }
return nil return nil
} }
// Stop closes the connections.
func (t *transacter) Stop() { func (t *transacter) Stop() {
t.stopped = true t.stopped = true
t.wg.Wait() t.wg.Wait()
for _, c := range t.conns { for _, c := range t.conns {
c.Stop()
c.Close()
}
}
// receiveLoop reads messages from the connection (empty in case of
// `broadcast_tx_async`).
func (t *transacter) receiveLoop(connIndex int) {
c := t.conns[connIndex]
defer t.wg.Done()
for {
_, _, err := c.ReadMessage()
if err != nil {
if websocket.IsUnexpectedCloseError(err, websocket.CloseNormalClosure) {
t.logger.Log("err", errors.Wrap(err, "failed to read response"))
}
return
}
if t.stopped {
return
}
} }
} }
// sendLoop generates transactions at a given rate.
func (t *transacter) sendLoop(connIndex int) { func (t *transacter) sendLoop(connIndex int) {
conn := t.conns[connIndex]
c := t.conns[connIndex]
logger := log.With(t.logger, "addr", c.RemoteAddr())
var txNumber = 0
pingsTicker := time.NewTicker(pingPeriod)
txsTicker := time.NewTicker(1 * time.Second)
defer func() {
pingsTicker.Stop()
txsTicker.Stop()
t.wg.Done()
}()
var num = 0
for { for {
startTime := time.Now()
select {
case <-txsTicker.C:
startTime := time.Now()
for i := 0; i < t.Rate; i++ {
// each transaction embeds connection index and tx number
tx := generateTx(connIndex, txNumber)
c.SetWriteDeadline(time.Now().Add(sendTimeout))
err := c.WriteJSON(rpctypes.RPCRequest{
JSONRPC: "2.0",
ID: "",
Method: "broadcast_tx_async",
Params: []interface{}{hex.EncodeToString(tx)},
})
if err != nil {
fmt.Printf("%v. Try increasing the connections count and reducing the rate.\n", errors.Wrap(err, "txs send failed"))
os.Exit(1)
}
txNumber++
}
for i := 0; i < t.Rate; i++ {
if t.stopped {
t.wg.Done()
return
timeToSend := time.Now().Sub(startTime)
time.Sleep(time.Second - timeToSend)
logger.Log("event", fmt.Sprintf("sent %d transactions", t.Rate), "took", timeToSend)
case <-pingsTicker.C:
// Right now go-rpc server closes the connection in the absence of pings
c.SetWriteDeadline(time.Now().Add(sendTimeout))
if err := c.WriteMessage(websocket.PingMessage, []byte{}); err != nil {
logger.Log("err", errors.Wrap(err, "failed to write ping message"))
} }
}
tx := generateTx(connIndex, num)
err := conn.WriteJSON(rpctypes.RPCRequest{
JSONRPC: "2.0",
ID: "",
Method: "broadcast_tx_async",
Params: []interface{}{hex.EncodeToString(tx)},
})
if t.stopped {
// To cleanly close a connection, a client should send a close
// frame and wait for the server to close the connection.
c.SetWriteDeadline(time.Now().Add(sendTimeout))
err := c.WriteMessage(websocket.CloseMessage, websocket.FormatCloseMessage(websocket.CloseNormalClosure, ""))
if err != nil { if err != nil {
fmt.Printf("Lost connection to %s. Please restart the test.\nDetails:\n%v", conn.Address, err)
os.Exit(1)
logger.Log("err", errors.Wrap(err, "failed to write close message"))
} }
num++
}
timeToSend := time.Now().Sub(startTime)
time.Sleep(time.Second - timeToSend)
return
}
} }
} }
func connect(host string) (*websocket.Conn, *http.Response, error) {
u := url.URL{Scheme: "ws", Host: host, Path: "/websocket"}
return websocket.DefaultDialer.Dial(u.String(), nil)
}
func generateTx(a int, b int) []byte { func generateTx(a int, b int) []byte {
tx := make([]byte, 250) tx := make([]byte, 250)
binary.PutUvarint(tx[:32], uint64(a)) binary.PutUvarint(tx[:32], uint64(a))
binary.PutUvarint(tx[32:64], uint64(b)) binary.PutUvarint(tx[32:64], uint64(b))
if _, err := rand.Read(tx[234:]); err != nil { if _, err := rand.Read(tx[234:]); err != nil {
panic(errors.Wrap(err, "err reading from crypto/rand"))
panic(errors.Wrap(err, "failed to generate transaction"))
} }
return tx return tx
} }

Loading…
Cancel
Save