From 68b07b9c97164a3825bdb597e657942172bdea50 Mon Sep 17 00:00:00 2001 From: Anton Kaliaev Date: Fri, 17 Mar 2017 13:13:06 +0400 Subject: [PATCH] connections flag --- tm-bench/README.md | 2 +- tm-bench/main.go | 11 ++++--- tm-bench/transacter.go | 71 ++++++++++++++++++++++++++---------------- 3 files changed, 51 insertions(+), 33 deletions(-) diff --git a/tm-bench/README.md b/tm-bench/README.md index ec78b965d..810951499 100644 --- a/tm-bench/README.md +++ b/tm-bench/README.md @@ -4,7 +4,7 @@ core](https://github.com/tendermint/tendermint) nodes. ``` -λ tm-bench -T10 -r1000 localhost:46657 +λ tm-bench -T 10 -r 1000 localhost:46657 Stats Avg Stdev Max Block latency 6.18ms 3.19ms 14ms Blocks/sec 0.828 0.378 1 diff --git a/tm-bench/main.go b/tm-bench/main.go index 993a521d3..fbeac5c7a 100644 --- a/tm-bench/main.go +++ b/tm-bench/main.go @@ -22,8 +22,9 @@ type statistics struct { } func main() { - var duration, txsRate int + var duration, txsRate, connections int + flag.IntVar(&connections, "c", 1, "Connections to keep open per endpoint") flag.IntVar(&duration, "T", 10, "Exit after the specified amount of time in seconds") flag.IntVar(&txsRate, "r", 1000, "Txs per second to send in a connection") @@ -31,7 +32,7 @@ func main() { fmt.Println(`Tendermint blockchain benchmarking tool. Usage: - tm-bench [-T 10] [-r 1000] [endpoints] + tm-bench [-c 1] [-T 10] [-r 1000] [endpoints] Examples: tm-bench localhost:46657`) @@ -55,7 +56,7 @@ Examples: nodes := startNodes(endpoints, blockCh, blockLatencyCh) - transacters := startTransacters(endpoints, txsRate) + transacters := startTransacters(endpoints, connections, txsRate) stats := &statistics{ BlockTimeSample: metrics.NewHistogram(metrics.NewUniformSample(1000)), @@ -114,11 +115,11 @@ func startNodes(endpoints []string, blockCh chan<- tmtypes.Header, blockLatencyC return nodes } -func startTransacters(endpoints []string, txsRate int) []*transacter { +func startTransacters(endpoints []string, connections int, txsRate int) []*transacter { transacters := make([]*transacter, len(endpoints)) for i, e := range endpoints { - t := newTransacter(e, txsRate) + t := newTransacter(e, connections, txsRate) if err := t.Start(); err != nil { panic(err) } diff --git a/tm-bench/transacter.go b/tm-bench/transacter.go index 5500a0979..cdfee0958 100644 --- a/tm-bench/transacter.go +++ b/tm-bench/transacter.go @@ -15,72 +15,89 @@ import ( ) type transacter struct { - Target string - Rate int + Target string + Rate int + Connections int - wsc *rpcclient.WSClient - stopped bool + conns []*rpcclient.WSClient wg sync.WaitGroup + stopped bool } -func newTransacter(target string, rate int) *transacter { +func newTransacter(target string, connections int, rate int) *transacter { + conns := make([]*rpcclient.WSClient, connections) + for i := 0; i < connections; i++ { + conns[i] = rpcclient.NewWSClient(target, "/websocket") + } + return &transacter{ - Target: target, - Rate: rate, - wsc: rpcclient.NewWSClient(target, "/websocket"), + Target: target, + Rate: rate, + Connections: connections, + conns: conns, } } func (t *transacter) Start() error { t.stopped = false - if _, err := t.wsc.Start(); err != nil { - return err + + for _, c := range t.conns { + if _, err := c.Start(); err != nil { + return err + } + } + + for i := 0; i < t.Connections; i++ { + t.wg.Add(1) + go t.sendLoop(i) } - t.wg.Add(1) - go t.sendLoop() + return nil } func (t *transacter) Stop() { t.stopped = true t.wg.Wait() - t.wsc.Stop() + for _, c := range t.conns { + c.Stop() + } } -func (t *transacter) sendLoop() { - var num = 0 +func (t *transacter) sendLoop(connIndex int) { + conn := t.conns[connIndex] + var num = 0 for { startTime := time.Now() + for i := 0; i < t.Rate; i++ { - tx := generateTx(num) - err := t.wsc.WriteJSON(rpctypes.RPCRequest{ + if t.stopped { + t.wg.Done() + return + } + + tx := generateTx(connIndex, num) + err := conn.WriteJSON(rpctypes.RPCRequest{ JSONRPC: "2.0", ID: "", Method: "broadcast_tx_async", Params: []interface{}{hex.EncodeToString(tx)}, }) if err != nil { - panic(errors.Wrap(err, fmt.Sprintf("lost connection to %s", t.Target))) + panic(errors.Wrap(err, fmt.Sprintf("lost connection to %s", conn.Address))) } num++ } - if t.stopped { - t.wg.Done() - return - } - timeToSend := time.Now().Sub(startTime) time.Sleep(time.Second - timeToSend) } } -// generateTx returns a random byte sequence where first 8 bytes are the number -// of transaction. -func generateTx(num int) []byte { +func generateTx(a int, b int) []byte { tx := make([]byte, 250) - binary.PutUvarint(tx[:32], uint64(num)) + binary.PutUvarint(tx[:32], uint64(a)) + binary.PutUvarint(tx[32:64], uint64(b)) if _, err := rand.Read(tx[234:]); err != nil { panic(errors.Wrap(err, "err reading from crypto/rand")) }