- tm-bench has a deprecation warning for 5 releases now, with the major release coming I removed the file and updated the docs to point to `tm-load-test` located in the interchainio repo Signed-off-by: Marko Baricevic <marbar3778@yahoo.com>pull/4174/head
@ -1,81 +1,4 @@ | |||
# tm-bench | |||
# tm-load-test | |||
**Deprecation Warning**: please use [tm-load-test](https://github.com/interchainio/tm-load-test) | |||
Tendermint blockchain benchmarking tool: | |||
- [https://github.com/tendermint/tendermint/tree/master/tools/tm-bench](https://github.com/tendermint/tendermint/tree/master/tools/tm-bench) | |||
For example, the following: | |||
``` | |||
tm-bench -T 10 -r 1000 localhost:26657 | |||
``` | |||
will output: | |||
``` | |||
Stats Avg StdDev Max Total | |||
Txs/sec 818 532 1549 9000 | |||
Blocks/sec 0.818 0.386 1 9 | |||
``` | |||
## Quick Start | |||
[Install Tendermint](../introduction/install.md) | |||
This currently is setup to work on tendermint's develop branch. Please ensure | |||
you are on that. (If not, update `tendermint` and `tmlibs` in gopkg.toml to use | |||
the master branch.) | |||
then run: | |||
``` | |||
tendermint init | |||
tendermint node --proxy_app=kvstore | |||
``` | |||
``` | |||
tm-bench localhost:26657 | |||
``` | |||
with the last command being in a seperate window. | |||
## Usage | |||
``` | |||
tm-bench [-c 1] [-T 10] [-r 1000] [-s 250] [endpoints] | |||
Examples: | |||
tm-bench localhost:26657 | |||
Flags: | |||
-T int | |||
Exit after the specified amount of time in seconds (default 10) | |||
-c int | |||
Connections to keep open per endpoint (default 1) | |||
-r int | |||
Txs per second to send in a connection (default 1000) | |||
-s int | |||
Size per tx in bytes | |||
-v Verbose output | |||
``` | |||
## How stats are collected | |||
These stats are derived by having each connection send transactions at the | |||
specified rate (or as close as it can get) for the specified time. After the | |||
specified time, it iterates over all of the blocks that were created in that | |||
time. The average and stddev per second are computed based off of that, by | |||
grouping the data by second. | |||
To send transactions at the specified rate in each connection, we loop | |||
through the number of transactions. If its too slow, the loop stops at one second. | |||
If its too fast, we wait until the one second mark ends. The transactions per | |||
second stat is computed based off of what ends up in the block. | |||
Each of the connections is handled via two separate goroutines. | |||
## Development | |||
``` | |||
make test | |||
``` | |||
Tm-bench has been removed in favor of [tm-load-test](https://github.com/interchainio/tm-load-test). | |||
for information on how to use tm-load-test please visit the docs: https://github.com/interchainio/tm-load-test |
@ -1,6 +0,0 @@ | |||
FROM alpine:3.8 | |||
WORKDIR /app | |||
COPY tm-bench /app/tm-bench | |||
ENTRYPOINT ["./tm-bench"] |
@ -1,11 +0,0 @@ | |||
FROM golang:latest | |||
RUN mkdir -p /go/src/github.com/tendermint/tendermint/tools/tm-bench | |||
WORKDIR /go/src/github.com/tendermint/tendermint/tools/tm-bench | |||
COPY Makefile /go/src/github.com/tendermint/tendermint/tools/tm-bench/ | |||
RUN make tools | |||
COPY . /go/src/github.com/tendermint/tendermint/tools/tm-bench | |||
@ -1,50 +0,0 @@ | |||
DIST_DIRS := find * -type d -exec | |||
VERSION := $(shell perl -ne '/^TMCoreSemVer = "([^"]+)"$$/ && print "v$$1\n"' ../../version/version.go) | |||
all: build test install | |||
######################################## | |||
### Build | |||
build: | |||
@go build | |||
install: | |||
@go install | |||
test: | |||
@go test -race | |||
build-all: | |||
rm -rf ./dist | |||
gox -verbose \ | |||
-ldflags "-s -w" \ | |||
-arch="amd64 386 arm arm64" \ | |||
-os="linux darwin windows freebsd" \ | |||
-osarch="!darwin/arm !darwin/arm64" \ | |||
-output="dist/{{.OS}}-{{.Arch}}/{{.Dir}}" . | |||
dist: build-all | |||
cd dist && \ | |||
$(DIST_DIRS) cp ../LICENSE {} \; && \ | |||
$(DIST_DIRS) cp ../README.rst {} \; && \ | |||
$(DIST_DIRS) tar -zcf tm-bench-${VERSION}-{}.tar.gz {} \; && \ | |||
shasum -a256 ./*.tar.gz > "./tm-bench_${VERSION}_SHA256SUMS" && \ | |||
cd .. | |||
######################################## | |||
### Docker | |||
build-docker: | |||
rm -f ./tm-bench | |||
docker run -it --rm -v "$(PWD)/../../:/go/src/github.com/tendermint/tendermint" -w "/go/src/github.com/tendermint/tendermint/tools/tm-bench" -e "CGO_ENABLED=0" golang:alpine go build -ldflags "-s -w" -o tm-bench | |||
docker build -t "tendermint/bench" . | |||
clean: | |||
rm -f ./tm-bench | |||
rm -rf ./dist | |||
# To avoid unintended conflicts with file names, always add to .PHONY | |||
# unless there is a reason not to. | |||
# https://www.gnu.org/software/make/manual/html_node/Phony-Targets.html | |||
.PHONY: build install test build-all dist build-docker clean |
@ -1,108 +0,0 @@ | |||
# tm-bench (Deprecated) | |||
> ## **Deprecation Warning** | |||
### This tool will be depreacted in favor of [tm-load-test](https://github.com/interchainio/tm-load-test). | |||
Tendermint blockchain benchmarking tool: | |||
- [https://github.com/tendermint/tendermint/tree/master/tools/tm-bench](https://github.com/tendermint/tendermint/tree/master/tools/tm-bench) | |||
For example, the following: `tm-bench -T 30 -r 10000 localhost:26657` | |||
will output: | |||
``` | |||
Stats Avg StdDev Max Total | |||
Txs/sec 3981 1993 5000 119434 | |||
Blocks/sec 0.800 0.400 1 24 | |||
``` | |||
NOTE: **tm-bench only works with build-in `kvstore` ABCI application**. For it | |||
to work with your application, you will need to modify `generateTx` function. | |||
In the future, we plan to support scriptable transactions (see | |||
[\#1938](https://github.com/tendermint/tendermint/issues/1938)). | |||
## Quick Start | |||
### Docker | |||
``` | |||
docker run -it --rm -v "/tmp:/tendermint" tendermint/tendermint init | |||
docker run -it --rm -v "/tmp:/tendermint" -p "26657:26657" --name=tm tendermint/tendermint node --proxy_app=kvstore | |||
docker run -it --rm --link=tm tendermint/bench tm:26657 | |||
``` | |||
### Using binaries | |||
[Install Tendermint](https://github.com/tendermint/tendermint#install) | |||
then run: | |||
``` | |||
tendermint init | |||
tendermint node --proxy_app=kvstore | |||
tm-bench localhost:26657 | |||
``` | |||
with the last command being in a separate window. | |||
## Usage | |||
``` | |||
Tendermint blockchain benchmarking tool. | |||
Usage: | |||
tm-bench [-c 1] [-T 10] [-r 1000] [-s 250] [endpoints] [-output-format <plain|json> [-broadcast-tx-method <async|sync|commit>]] | |||
Examples: | |||
tm-bench localhost:26657 | |||
Flags: | |||
-T int | |||
Exit after the specified amount of time in seconds (default 10) | |||
-broadcast-tx-method string | |||
Broadcast method: async (no guarantees; fastest), sync (ensures tx is checked) or commit (ensures tx is checked and committed; slowest) (default "async") | |||
-c int | |||
Connections to keep open per endpoint (default 1) | |||
-output-format string | |||
Output format: plain or json (default "plain") | |||
-r int | |||
Txs per second to send in a connection (default 1000) | |||
-s int | |||
The size of a transaction in bytes, must be greater than or equal to 40. (default 250) | |||
-v Verbose output | |||
``` | |||
## How stats are collected | |||
These stats are derived by having each connection send transactions at the | |||
specified rate (or as close as it can get) for the specified time. | |||
After the specified time, it iterates over all of the blocks that were created | |||
in that time. | |||
The average and stddev per second are computed based off of that, by | |||
grouping the data by second. | |||
To send transactions at the specified rate in each connection, we loop | |||
through the number of transactions. | |||
If its too slow, the loop stops at one second. | |||
If its too fast, we wait until the one second mark ends. | |||
The transactions per second stat is computed based off of what ends up in the | |||
block. | |||
Note that there will be edge effects on the number of transactions in the first | |||
and last blocks. | |||
This is because transactions may start sending midway through when tendermint | |||
starts building the next block, so it only has half as much time to gather txs | |||
that tm-bench sends. | |||
Similarly the end of the duration will likely end mid-way through tendermint | |||
trying to build the next block. | |||
Each of the connections is handled via two separate goroutines. | |||
## Development | |||
``` | |||
make test | |||
``` |
@ -1,195 +0,0 @@ | |||
package main | |||
import ( | |||
"flag" | |||
"fmt" | |||
"os" | |||
"strings" | |||
"sync" | |||
"time" | |||
"github.com/go-kit/kit/log/term" | |||
cmn "github.com/tendermint/tendermint/libs/common" | |||
"github.com/tendermint/tendermint/libs/log" | |||
"github.com/tendermint/tendermint/rpc/client" | |||
tmrpc "github.com/tendermint/tendermint/rpc/client" | |||
) | |||
var logger = log.NewNopLogger() | |||
func main() { | |||
var durationInt, txsRate, connections, txSize int | |||
var verbose bool | |||
var outputFormat, broadcastTxMethod string | |||
var usage = "tm-bench [-c 1] [-T 10] [-r 1000] [-s 250]" + | |||
" [endpoints] [-output-format <plain|json> [-broadcast-tx-method <async|sync|commit>]]" | |||
flagSet := flag.NewFlagSet("tm-bench", flag.ExitOnError) | |||
flagSet.IntVar(&connections, "c", 1, "Connections to keep open per endpoint") | |||
flagSet.IntVar(&durationInt, "T", 10, "Exit after the specified amount of time in seconds") | |||
flagSet.IntVar(&txsRate, "r", 1000, "Txs per second to send in a connection") | |||
flagSet.IntVar(&txSize, "s", 250, "The size of a transaction in bytes, must be greater than or equal to 40.") | |||
flagSet.StringVar(&outputFormat, "output-format", "plain", "Output format: plain or json") | |||
flagSet.StringVar( | |||
&broadcastTxMethod, | |||
"broadcast-tx-method", | |||
"async", | |||
"Broadcast method: async (no guarantees; fastest),"+ | |||
" sync (ensures tx is checked) or commit (ensures tx is checked and committed; slowest)", | |||
) | |||
flagSet.BoolVar(&verbose, "v", false, "Verbose output") | |||
flagSet.Usage = func() { | |||
fmt.Println(`Tendermint blockchain benchmarking tool. | |||
Usage: | |||
` + usage + ` | |||
Examples: | |||
tm-bench localhost:26657`) | |||
fmt.Println("Flags:") | |||
flagSet.PrintDefaults() | |||
} | |||
flagSet.Parse(os.Args[1:]) | |||
if flagSet.NArg() == 0 { | |||
flagSet.Usage() | |||
os.Exit(1) | |||
} | |||
if verbose { | |||
if outputFormat == "json" { | |||
printErrorAndExit("Verbose mode not supported with json output.") | |||
} | |||
// Color errors red | |||
colorFn := func(keyvals ...interface{}) term.FgBgColor { | |||
for i := 1; i < len(keyvals); i += 2 { | |||
if _, ok := keyvals[i].(error); ok { | |||
return term.FgBgColor{Fg: term.White, Bg: term.Red} | |||
} | |||
} | |||
return term.FgBgColor{} | |||
} | |||
logger = log.NewTMLoggerWithColorFn(log.NewSyncWriter(os.Stdout), colorFn) | |||
fmt.Printf("Running %ds test @ %s\n", durationInt, flagSet.Arg(0)) | |||
} | |||
if txSize < 40 { | |||
printErrorAndExit("The size of a transaction must be greater than or equal to 40.") | |||
} | |||
if broadcastTxMethod != "async" && | |||
broadcastTxMethod != "sync" && | |||
broadcastTxMethod != "commit" { | |||
printErrorAndExit("broadcast-tx-method should be either 'sync', 'async' or 'commit'.") | |||
} | |||
var ( | |||
endpoints = strings.Split(flagSet.Arg(0), ",") | |||
client = tmrpc.NewHTTP(endpoints[0], "/websocket") | |||
initialHeight = latestBlockHeight(client) | |||
) | |||
logger.Info("Latest block height", "h", initialHeight) | |||
transacters := startTransacters( | |||
endpoints, | |||
connections, | |||
txsRate, | |||
txSize, | |||
"broadcast_tx_"+broadcastTxMethod, | |||
) | |||
// Stop upon receiving SIGTERM or CTRL-C. | |||
cmn.TrapSignal(logger, func() { | |||
for _, t := range transacters { | |||
t.Stop() | |||
} | |||
}) | |||
// Wait until transacters have begun until we get the start time. | |||
timeStart := time.Now() | |||
logger.Info("Time last transacter started", "t", timeStart) | |||
duration := time.Duration(durationInt) * time.Second | |||
timeEnd := timeStart.Add(duration) | |||
logger.Info("End time for calculation", "t", timeEnd) | |||
<-time.After(duration) | |||
for i, t := range transacters { | |||
t.Stop() | |||
numCrashes := countCrashes(t.connsBroken) | |||
if numCrashes != 0 { | |||
fmt.Printf("%d connections crashed on transacter #%d\n", numCrashes, i) | |||
} | |||
} | |||
logger.Debug("Time all transacters stopped", "t", time.Now()) | |||
stats, err := calculateStatistics( | |||
client, | |||
initialHeight, | |||
timeStart, | |||
durationInt, | |||
) | |||
if err != nil { | |||
printErrorAndExit(err.Error()) | |||
} | |||
printStatistics(stats, outputFormat) | |||
} | |||
func latestBlockHeight(client client.StatusClient) int64 { | |||
status, err := client.Status() | |||
if err != nil { | |||
fmt.Fprintln(os.Stderr, err) | |||
os.Exit(1) | |||
} | |||
return status.SyncInfo.LatestBlockHeight | |||
} | |||
func countCrashes(crashes []bool) int { | |||
count := 0 | |||
for i := 0; i < len(crashes); i++ { | |||
if crashes[i] { | |||
count++ | |||
} | |||
} | |||
return count | |||
} | |||
func startTransacters( | |||
endpoints []string, | |||
connections, | |||
txsRate int, | |||
txSize int, | |||
broadcastTxMethod string, | |||
) []*transacter { | |||
transacters := make([]*transacter, len(endpoints)) | |||
wg := sync.WaitGroup{} | |||
wg.Add(len(endpoints)) | |||
for i, e := range endpoints { | |||
t := newTransacter(e, connections, txsRate, txSize, broadcastTxMethod) | |||
t.SetLogger(logger) | |||
go func(i int) { | |||
defer wg.Done() | |||
if err := t.Start(); err != nil { | |||
fmt.Fprintln(os.Stderr, err) | |||
os.Exit(1) | |||
} | |||
transacters[i] = t | |||
}(i) | |||
} | |||
wg.Wait() | |||
return transacters | |||
} | |||
func printErrorAndExit(err string) { | |||
fmt.Fprintln(os.Stderr, err) | |||
os.Exit(1) | |||
} |
@ -1,150 +0,0 @@ | |||
package main | |||
import ( | |||
"encoding/json" | |||
"fmt" | |||
"math" | |||
"os" | |||
"text/tabwriter" | |||
"time" | |||
metrics "github.com/rcrowley/go-metrics" | |||
tmrpc "github.com/tendermint/tendermint/rpc/client" | |||
"github.com/tendermint/tendermint/types" | |||
) | |||
type statistics struct { | |||
TxsThroughput metrics.Histogram `json:"txs_per_sec"` | |||
BlocksThroughput metrics.Histogram `json:"blocks_per_sec"` | |||
} | |||
// calculateStatistics calculates the tx / second, and blocks / second based | |||
// off of the number the transactions and number of blocks that occurred from | |||
// the start block, and the end time. | |||
func calculateStatistics( | |||
client tmrpc.Client, | |||
minHeight int64, | |||
timeStart time.Time, | |||
duration int, | |||
) (*statistics, error) { | |||
timeEnd := timeStart.Add(time.Duration(duration) * time.Second) | |||
stats := &statistics{ | |||
BlocksThroughput: metrics.NewHistogram(metrics.NewUniformSample(1000)), | |||
TxsThroughput: metrics.NewHistogram(metrics.NewUniformSample(1000)), | |||
} | |||
var ( | |||
numBlocksPerSec = make(map[int]int) | |||
numTxsPerSec = make(map[int]int) | |||
) | |||
// because during some seconds blocks won't be created... | |||
for i := 0; i < duration; i++ { | |||
numBlocksPerSec[i] = 0 | |||
numTxsPerSec[i] = 0 | |||
} | |||
blockMetas, err := getBlockMetas(client, minHeight, timeStart, timeEnd) | |||
if err != nil { | |||
return nil, err | |||
} | |||
// iterates from max height to min height | |||
for _, blockMeta := range blockMetas { | |||
// check if block was created after timeStart | |||
if blockMeta.Header.Time.Before(timeStart) { | |||
break | |||
} | |||
// check if block was created before timeEnd | |||
if blockMeta.Header.Time.After(timeEnd) { | |||
continue | |||
} | |||
sec := secondsSinceTimeStart(timeStart, blockMeta.Header.Time) | |||
// increase number of blocks for that second | |||
numBlocksPerSec[sec]++ | |||
// increase number of txs for that second | |||
numTxsPerSec[sec] += blockMeta.NumTxs | |||
logger.Debug(fmt.Sprintf("%d txs at block height %d", blockMeta.NumTxs, blockMeta.Header.Height)) | |||
} | |||
for i := 0; i < duration; i++ { | |||
stats.BlocksThroughput.Update(int64(numBlocksPerSec[i])) | |||
stats.TxsThroughput.Update(int64(numTxsPerSec[i])) | |||
} | |||
return stats, nil | |||
} | |||
func getBlockMetas(client tmrpc.Client, minHeight int64, timeStart, timeEnd time.Time) ([]*types.BlockMeta, error) { | |||
// get blocks between minHeight and last height | |||
// This returns max(minHeight,(last_height - 20)) to last_height | |||
info, err := client.BlockchainInfo(minHeight, 0) | |||
if err != nil { | |||
return nil, err | |||
} | |||
var ( | |||
blockMetas = info.BlockMetas | |||
lastHeight = info.LastHeight | |||
diff = lastHeight - minHeight | |||
offset = len(blockMetas) | |||
) | |||
for offset < int(diff) { | |||
// get blocks between minHeight and last height | |||
info, err := client.BlockchainInfo(minHeight, lastHeight-int64(offset)) | |||
if err != nil { | |||
return nil, err | |||
} | |||
blockMetas = append(blockMetas, info.BlockMetas...) | |||
offset = len(blockMetas) | |||
} | |||
return blockMetas, nil | |||
} | |||
func secondsSinceTimeStart(timeStart, timePassed time.Time) int { | |||
return int(math.Round(timePassed.Sub(timeStart).Seconds())) | |||
} | |||
func printStatistics(stats *statistics, outputFormat string) { | |||
if outputFormat == "json" { | |||
result, err := json.Marshal(struct { | |||
TxsThroughput float64 `json:"txs_per_sec_avg"` | |||
BlocksThroughput float64 `json:"blocks_per_sec_avg"` | |||
}{stats.TxsThroughput.Mean(), stats.BlocksThroughput.Mean()}) | |||
if err != nil { | |||
fmt.Fprintln(os.Stderr, err) | |||
os.Exit(1) | |||
} | |||
fmt.Println(string(result)) | |||
} else { | |||
w := tabwriter.NewWriter(os.Stdout, 0, 0, 5, ' ', 0) | |||
fmt.Fprintln(w, "Stats\tAvg\tStdDev\tMax\tTotal\t") | |||
fmt.Fprintln( | |||
w, | |||
fmt.Sprintf( | |||
"Txs/sec\t%.0f\t%.0f\t%d\t%d\t", | |||
stats.TxsThroughput.Mean(), | |||
stats.TxsThroughput.StdDev(), | |||
stats.TxsThroughput.Max(), | |||
stats.TxsThroughput.Sum(), | |||
), | |||
) | |||
fmt.Fprintln( | |||
w, | |||
fmt.Sprintf("Blocks/sec\t%.3f\t%.3f\t%d\t%d\t", | |||
stats.BlocksThroughput.Mean(), | |||
stats.BlocksThroughput.StdDev(), | |||
stats.BlocksThroughput.Max(), | |||
stats.BlocksThroughput.Sum(), | |||
), | |||
) | |||
w.Flush() | |||
} | |||
} |
@ -1,287 +0,0 @@ | |||
package main | |||
import ( | |||
"crypto/sha256" | |||
"encoding/binary" | |||
"encoding/hex" | |||
"encoding/json" | |||
"fmt" | |||
// it is ok to use math/rand here: we do not need a cryptographically secure random | |||
// number generator here and we can run the tests a bit faster | |||
"math/rand" | |||
"net" | |||
"net/http" | |||
"net/url" | |||
"os" | |||
"sync" | |||
"time" | |||
"github.com/gorilla/websocket" | |||
"github.com/pkg/errors" | |||
"github.com/tendermint/tendermint/libs/log" | |||
rpctypes "github.com/tendermint/tendermint/rpc/lib/types" | |||
) | |||
const ( | |||
sendTimeout = 10 * time.Second | |||
// see https://github.com/tendermint/tendermint/blob/master/rpc/lib/server/handlers.go | |||
pingPeriod = (30 * 9 / 10) * time.Second | |||
) | |||
type transacter struct { | |||
Target string | |||
Rate int | |||
Size int | |||
Connections int | |||
BroadcastTxMethod string | |||
conns []*websocket.Conn | |||
connsBroken []bool | |||
startingWg sync.WaitGroup | |||
endingWg sync.WaitGroup | |||
stopped bool | |||
logger log.Logger | |||
} | |||
func newTransacter(target string, connections, rate int, size int, broadcastTxMethod string) *transacter { | |||
return &transacter{ | |||
Target: target, | |||
Rate: rate, | |||
Size: size, | |||
Connections: connections, | |||
BroadcastTxMethod: broadcastTxMethod, | |||
conns: make([]*websocket.Conn, connections), | |||
connsBroken: make([]bool, connections), | |||
logger: log.NewNopLogger(), | |||
} | |||
} | |||
// SetLogger lets you set your own logger | |||
func (t *transacter) SetLogger(l log.Logger) { | |||
t.logger = l | |||
} | |||
// Start opens N = `t.Connections` connections to the target and creates read | |||
// and write goroutines for each connection. | |||
func (t *transacter) Start() error { | |||
t.stopped = false | |||
rand.Seed(time.Now().Unix()) | |||
for i := 0; i < t.Connections; i++ { | |||
c, _, err := connect(t.Target) // nolint:bodyclose | |||
if err != nil { | |||
return err | |||
} | |||
t.conns[i] = c | |||
} | |||
t.startingWg.Add(t.Connections) | |||
t.endingWg.Add(2 * t.Connections) | |||
for i := 0; i < t.Connections; i++ { | |||
go t.sendLoop(i) | |||
go t.receiveLoop(i) | |||
} | |||
t.startingWg.Wait() | |||
return nil | |||
} | |||
// Stop closes the connections. | |||
func (t *transacter) Stop() { | |||
t.stopped = true | |||
t.endingWg.Wait() | |||
for _, c := range t.conns { | |||
c.Close() | |||
} | |||
} | |||
// receiveLoop reads messages from the connection (empty in case of | |||
// `broadcast_tx_async`). | |||
func (t *transacter) receiveLoop(connIndex int) { | |||
c := t.conns[connIndex] | |||
defer t.endingWg.Done() | |||
for { | |||
_, _, err := c.ReadMessage() | |||
if err != nil { | |||
if !websocket.IsCloseError(err, websocket.CloseNormalClosure) { | |||
t.logger.Error( | |||
fmt.Sprintf("failed to read response on conn %d", connIndex), | |||
"err", | |||
err, | |||
) | |||
} | |||
return | |||
} | |||
if t.stopped || t.connsBroken[connIndex] { | |||
return | |||
} | |||
} | |||
} | |||
// sendLoop generates transactions at a given rate. | |||
func (t *transacter) sendLoop(connIndex int) { | |||
started := false | |||
// Close the starting waitgroup, in the event that this fails to start | |||
defer func() { | |||
if !started { | |||
t.startingWg.Done() | |||
} | |||
}() | |||
c := t.conns[connIndex] | |||
c.SetPingHandler(func(message string) error { | |||
err := c.WriteControl(websocket.PongMessage, []byte(message), time.Now().Add(sendTimeout)) | |||
if err == websocket.ErrCloseSent { | |||
return nil | |||
} else if e, ok := err.(net.Error); ok && e.Temporary() { | |||
return nil | |||
} | |||
return err | |||
}) | |||
logger := t.logger.With("addr", c.RemoteAddr()) | |||
var txNumber = 0 | |||
pingsTicker := time.NewTicker(pingPeriod) | |||
txsTicker := time.NewTicker(1 * time.Second) | |||
defer func() { | |||
pingsTicker.Stop() | |||
txsTicker.Stop() | |||
t.endingWg.Done() | |||
}() | |||
// hash of the host name is a part of each tx | |||
var hostnameHash [sha256.Size]byte | |||
hostname, err := os.Hostname() | |||
if err != nil { | |||
hostname = "127.0.0.1" | |||
} | |||
hostnameHash = sha256.Sum256([]byte(hostname)) | |||
// each transaction embeds connection index, tx number and hash of the hostname | |||
// we update the tx number between successive txs | |||
tx := generateTx(connIndex, txNumber, t.Size, hostnameHash) | |||
txHex := make([]byte, len(tx)*2) | |||
hex.Encode(txHex, tx) | |||
for { | |||
select { | |||
case <-txsTicker.C: | |||
startTime := time.Now() | |||
endTime := startTime.Add(time.Second) | |||
numTxSent := t.Rate | |||
if !started { | |||
t.startingWg.Done() | |||
started = true | |||
} | |||
now := time.Now() | |||
for i := 0; i < t.Rate; i++ { | |||
// update tx number of the tx, and the corresponding hex | |||
updateTx(tx, txHex, txNumber) | |||
paramsJSON, err := json.Marshal(map[string]interface{}{"tx": txHex}) | |||
if err != nil { | |||
fmt.Printf("failed to encode params: %v\n", err) | |||
os.Exit(1) | |||
} | |||
rawParamsJSON := json.RawMessage(paramsJSON) | |||
c.SetWriteDeadline(now.Add(sendTimeout)) | |||
err = c.WriteJSON(rpctypes.RPCRequest{ | |||
JSONRPC: "2.0", | |||
ID: rpctypes.JSONRPCIntID((connIndex * t.Rate) + i), | |||
Method: t.BroadcastTxMethod, | |||
Params: rawParamsJSON, | |||
}) | |||
if err != nil { | |||
err = errors.Wrap(err, | |||
fmt.Sprintf("txs send failed on connection #%d", connIndex)) | |||
t.connsBroken[connIndex] = true | |||
logger.Error(err.Error()) | |||
return | |||
} | |||
// cache the time.Now() reads to save time. | |||
if i%5 == 0 { | |||
now = time.Now() | |||
if now.After(endTime) { | |||
// Plus one accounts for sending this tx | |||
numTxSent = i + 1 | |||
break | |||
} | |||
} | |||
txNumber++ | |||
} | |||
timeToSend := time.Since(startTime) | |||
logger.Info(fmt.Sprintf("sent %d transactions", numTxSent), "took", timeToSend) | |||
if timeToSend < 1*time.Second { | |||
sleepTime := time.Second - timeToSend | |||
logger.Debug(fmt.Sprintf("connection #%d is sleeping for %f seconds", connIndex, sleepTime.Seconds())) | |||
time.Sleep(sleepTime) | |||
} | |||
case <-pingsTicker.C: | |||
// go-rpc server closes the connection in the absence of pings | |||
c.SetWriteDeadline(time.Now().Add(sendTimeout)) | |||
if err := c.WriteMessage(websocket.PingMessage, []byte{}); err != nil { | |||
err = errors.Wrap(err, | |||
fmt.Sprintf("failed to write ping message on conn #%d", connIndex)) | |||
logger.Error(err.Error()) | |||
t.connsBroken[connIndex] = true | |||
} | |||
} | |||
if t.stopped { | |||
// To cleanly close a connection, a client should send a close | |||
// frame and wait for the server to close the connection. | |||
c.SetWriteDeadline(time.Now().Add(sendTimeout)) | |||
err := c.WriteMessage(websocket.CloseMessage, websocket.FormatCloseMessage(websocket.CloseNormalClosure, "")) | |||
if err != nil { | |||
err = errors.Wrap(err, | |||
fmt.Sprintf("failed to write close message on conn #%d", connIndex)) | |||
logger.Error(err.Error()) | |||
t.connsBroken[connIndex] = true | |||
} | |||
return | |||
} | |||
} | |||
} | |||
func connect(host string) (*websocket.Conn, *http.Response, error) { | |||
u := url.URL{Scheme: "ws", Host: host, Path: "/websocket"} | |||
return websocket.DefaultDialer.Dial(u.String(), nil) | |||
} | |||
func generateTx(connIndex int, txNumber int, txSize int, hostnameHash [sha256.Size]byte) []byte { | |||
tx := make([]byte, txSize) | |||
binary.PutUvarint(tx[:8], uint64(connIndex)) | |||
binary.PutUvarint(tx[8:16], uint64(txNumber)) | |||
copy(tx[16:32], hostnameHash[:16]) | |||
binary.PutUvarint(tx[32:40], uint64(time.Now().Unix())) | |||
// 40-* random data | |||
if _, err := rand.Read(tx[40:]); err != nil { //nolint: gosec | |||
panic(errors.Wrap(err, "failed to read random bytes")) | |||
} | |||
return tx | |||
} | |||
// warning, mutates input byte slice | |||
func updateTx(tx []byte, txHex []byte, txNumber int) { | |||
binary.PutUvarint(tx[8:16], uint64(txNumber)) | |||
hexUpdate := make([]byte, 16) | |||
hex.Encode(hexUpdate, tx[8:16]) | |||
for i := 16; i < 32; i++ { | |||
txHex[i] = hexUpdate[i-16] | |||
} | |||
} |
@ -1,104 +0,0 @@ | |||
package main | |||
import ( | |||
"crypto/sha256" | |||
"encoding/hex" | |||
"encoding/json" | |||
"fmt" | |||
"os" | |||
"testing" | |||
"time" | |||
"github.com/pkg/errors" | |||
"github.com/stretchr/testify/require" | |||
) | |||
// This test tests that the output of generate tx and update tx is consistent | |||
func TestGenerateTxUpdateTxConsistentency(t *testing.T) { | |||
cases := []struct { | |||
connIndex int | |||
startingTxNumber int | |||
txSize int | |||
hostname string | |||
numTxsToTest int | |||
}{ | |||
{0, 0, 40, "localhost:26657", 1000}, | |||
{70, 300, 10000, "localhost:26657", 1000}, | |||
{0, 50, 100000, "localhost:26657", 1000}, | |||
} | |||
for tcIndex, tc := range cases { | |||
hostnameHash := sha256.Sum256([]byte(tc.hostname)) | |||
// Tx generated from update tx. This is defined outside of the loop, since we have | |||
// to a have something initially to update | |||
updatedTx := generateTx(tc.connIndex, tc.startingTxNumber, tc.txSize, hostnameHash) | |||
updatedHex := make([]byte, len(updatedTx)*2) | |||
hex.Encode(updatedHex, updatedTx) | |||
for i := 0; i < tc.numTxsToTest; i++ { | |||
expectedTx := generateTx(tc.connIndex, tc.startingTxNumber+i, tc.txSize, hostnameHash) | |||
expectedHex := make([]byte, len(expectedTx)*2) | |||
hex.Encode(expectedHex, expectedTx) | |||
updateTx(updatedTx, updatedHex, tc.startingTxNumber+i) | |||
// after first 32 bytes is 8 bytes of time, then purely random bytes | |||
require.Equal(t, expectedTx[:32], updatedTx[:32], | |||
"First 32 bytes of the txs differed. tc #%d, i #%d", tcIndex, i) | |||
require.Equal(t, expectedHex[:64], updatedHex[:64], | |||
"First 64 bytes of the hex differed. tc #%d, i #%d", tcIndex, i) | |||
// Test the lengths of the txs are as expected | |||
require.Equal(t, tc.txSize, len(expectedTx), | |||
"Length of expected Tx differed. tc #%d, i #%d", tcIndex, i) | |||
require.Equal(t, tc.txSize, len(updatedTx), | |||
"Length of expected Tx differed. tc #%d, i #%d", tcIndex, i) | |||
require.Equal(t, tc.txSize*2, len(expectedHex), | |||
"Length of expected hex differed. tc #%d, i #%d", tcIndex, i) | |||
require.Equal(t, tc.txSize*2, len(updatedHex), | |||
"Length of updated hex differed. tc #%d, i #%d", tcIndex, i) | |||
} | |||
} | |||
} | |||
func BenchmarkIterationOfSendLoop(b *testing.B) { | |||
var ( | |||
connIndex = 0 | |||
txSize = 25000 | |||
) | |||
now := time.Now() | |||
// something too far away to matter | |||
endTime := now.Add(time.Hour) | |||
txNumber := 0 | |||
hostnameHash := sha256.Sum256([]byte{0}) | |||
tx := generateTx(connIndex, txNumber, txSize, hostnameHash) | |||
txHex := make([]byte, len(tx)*2) | |||
hex.Encode(txHex, tx) | |||
b.ResetTimer() | |||
for i := 0; i < b.N; i++ { | |||
updateTx(tx, txHex, txNumber) | |||
paramsJSON, err := json.Marshal(map[string]interface{}{"tx": txHex}) | |||
if err != nil { | |||
fmt.Printf("failed to encode params: %v\n", err) | |||
os.Exit(1) | |||
} | |||
_ = json.RawMessage(paramsJSON) | |||
_ = now.Add(sendTimeout) | |||
if err != nil { | |||
err = errors.Wrap(err, | |||
fmt.Sprintf("txs send failed on connection #%d", connIndex)) | |||
logger.Error(err.Error()) | |||
return | |||
} | |||
// Cache the now operations | |||
if i%5 == 0 { | |||
now = time.Now() | |||
if now.After(endTime) { | |||
break | |||
} | |||
} | |||
txNumber++ | |||
} | |||
} |