diff --git a/tm-bench/Dockerfile b/tm-bench/Dockerfile new file mode 100644 index 000000000..6e08e6543 --- /dev/null +++ b/tm-bench/Dockerfile @@ -0,0 +1,6 @@ +FROM alpine:3.5 + +WORKDIR /app +COPY tm-bench /app/tm-bench + +ENTRYPOINT ["./tm-bench"] diff --git a/tm-bench/Makefile b/tm-bench/Makefile index cf31dbd36..1d4c8a49b 100644 --- a/tm-bench/Makefile +++ b/tm-bench/Makefile @@ -38,7 +38,7 @@ dist: build-all build-docker: rm -f ./tm-bench docker run -it --rm -v "$(PWD):/go/src/app" -w "/go/src/app" -e "CGO_ENABLED=0" golang:alpine go build -ldflags "-X main.version=${VERSION}" -o tm-bench - docker build -t "tendermint/monitor" . + docker build -t "tendermint/bench" . clean: rm -f ./tm-bench diff --git a/tm-bench/glide.yaml b/tm-bench/glide.yaml index cb255784a..c9f8bf597 100644 --- a/tm-bench/glide.yaml +++ b/tm-bench/glide.yaml @@ -8,3 +8,7 @@ import: - package: github.com/tendermint/tools subpackages: - tm-monitor/monitor +- package: github.com/go-kit/kit + subpackages: + - log + - term diff --git a/tm-bench/main.go b/tm-bench/main.go index fbeac5c7a..34bb4cd19 100644 --- a/tm-bench/main.go +++ b/tm-bench/main.go @@ -8,6 +8,8 @@ import ( "text/tabwriter" "time" + "github.com/go-kit/kit/log" + "github.com/go-kit/kit/log/term" metrics "github.com/rcrowley/go-metrics" tmtypes "github.com/tendermint/tendermint/types" "github.com/tendermint/tools/tm-monitor/monitor" @@ -15,6 +17,8 @@ import ( var version = "0.1.0.pre" +var logger = log.NewNopLogger() + type statistics struct { BlockTimeSample metrics.Histogram TxThroughputSample metrics.Histogram @@ -23,10 +27,12 @@ type statistics struct { func main() { var duration, txsRate, connections int + var verbose bool flag.IntVar(&connections, "c", 1, "Connections to keep open per endpoint") flag.IntVar(&duration, "T", 10, "Exit after the specified amount of time in seconds") flag.IntVar(&txsRate, "r", 1000, "Txs per second to send in a connection") + flag.BoolVar(&verbose, "v", false, "Verbose output") flag.Usage = func() { fmt.Println(`Tendermint blockchain benchmarking tool. @@ -47,6 +53,19 @@ Examples: os.Exit(1) } + if verbose { + // Color errors red + colorFn := func(keyvals ...interface{}) term.FgBgColor { + for i := 1; i < len(keyvals); i += 2 { + if _, ok := keyvals[i].(error); ok { + return term.FgBgColor{Fg: term.White, Bg: term.Red} + } + } + return term.FgBgColor{} + } + logger = term.NewLogger(os.Stdout, log.NewLogfmtLogger, colorFn) + } + fmt.Printf("Running %ds test @ %s\n", duration, flag.Arg(0)) endpoints := strings.Split(flag.Arg(0), ",") @@ -104,10 +123,12 @@ func startNodes(endpoints []string, blockCh chan<- tmtypes.Header, blockLatencyC for i, e := range endpoints { n := monitor.NewNode(e) + n.SetLogger(log.With(logger, "node", e)) n.SendBlocksTo(blockCh) n.SendBlockLatenciesTo(blockLatencyCh) if err := n.Start(); err != nil { - panic(err) + fmt.Println(err) + os.Exit(1) } nodes[i] = n } @@ -120,8 +141,10 @@ func startTransacters(endpoints []string, connections int, txsRate int) []*trans for i, e := range endpoints { t := newTransacter(e, connections, txsRate) + t.SetLogger(logger) if err := t.Start(); err != nil { - panic(err) + fmt.Println(err) + os.Exit(1) } transacters[i] = t } diff --git a/tm-bench/transacter.go b/tm-bench/transacter.go index d35a9e6f0..4a9161f4c 100644 --- a/tm-bench/transacter.go +++ b/tm-bench/transacter.go @@ -5,101 +5,177 @@ import ( "encoding/hex" "fmt" "math/rand" + "net/http" + "net/url" + "os" "sync" "time" + "github.com/go-kit/kit/log" + "github.com/gorilla/websocket" "github.com/pkg/errors" - rpcclient "github.com/tendermint/go-rpc/client" rpctypes "github.com/tendermint/go-rpc/types" ) +const ( + sendTimeout = 500 * time.Millisecond + // see https://github.com/tendermint/go-rpc/blob/develop/server/handlers.go#L313 + pingPeriod = (30 * 9 / 10) * time.Second +) + type transacter struct { Target string Rate int Connections int - conns []*rpcclient.WSClient + conns []*websocket.Conn wg sync.WaitGroup stopped bool + + logger log.Logger } func newTransacter(target string, connections int, rate int) *transacter { - conns := make([]*rpcclient.WSClient, connections) - for i := 0; i < connections; i++ { - conns[i] = rpcclient.NewWSClient(target, "/websocket") - } - return &transacter{ Target: target, Rate: rate, Connections: connections, - conns: conns, + conns: make([]*websocket.Conn, connections), + logger: log.NewNopLogger(), } } +// SetLogger lets you set your own logger +func (t *transacter) SetLogger(l log.Logger) { + t.logger = l +} + +// Start opens N = `t.Connections` connections to the target and creates read +// and write goroutines for each connection. func (t *transacter) Start() error { t.stopped = false - for _, c := range t.conns { - if _, err := c.Start(); err != nil { + for i := 0; i < t.Connections; i++ { + c, _, err := connect(t.Target) + if err != nil { return err } + t.conns[i] = c } + t.wg.Add(2 * t.Connections) for i := 0; i < t.Connections; i++ { - t.wg.Add(1) go t.sendLoop(i) + go t.receiveLoop(i) } return nil } +// Stop closes the connections. func (t *transacter) Stop() { t.stopped = true t.wg.Wait() for _, c := range t.conns { - c.Stop() + c.Close() + } +} + +// receiveLoop reads messages from the connection (empty in case of +// `broadcast_tx_async`). +func (t *transacter) receiveLoop(connIndex int) { + c := t.conns[connIndex] + defer t.wg.Done() + for { + _, _, err := c.ReadMessage() + if err != nil { + if websocket.IsUnexpectedCloseError(err, websocket.CloseNormalClosure) { + t.logger.Log("err", errors.Wrap(err, "failed to read response")) + } + return + } + if t.stopped { + return + } } } +// sendLoop generates transactions at a given rate. func (t *transacter) sendLoop(connIndex int) { - conn := t.conns[connIndex] + c := t.conns[connIndex] + logger := log.With(t.logger, "addr", c.RemoteAddr()) + + var txNumber = 0 + + pingsTicker := time.NewTicker(pingPeriod) + txsTicker := time.NewTicker(1 * time.Second) + defer func() { + pingsTicker.Stop() + txsTicker.Stop() + t.wg.Done() + }() - var num = 0 for { - startTime := time.Now() + select { + case <-txsTicker.C: + startTime := time.Now() + + for i := 0; i < t.Rate; i++ { + // each transaction embeds connection index and tx number + tx := generateTx(connIndex, txNumber) + + c.SetWriteDeadline(time.Now().Add(sendTimeout)) + err := c.WriteJSON(rpctypes.RPCRequest{ + JSONRPC: "2.0", + ID: "", + Method: "broadcast_tx_async", + Params: []interface{}{hex.EncodeToString(tx)}, + }) + if err != nil { + fmt.Printf("%v. Try increasing the connections count and reducing the rate.\n", errors.Wrap(err, "txs send failed")) + os.Exit(1) + } + + txNumber++ + } - for i := 0; i < t.Rate; i++ { - if t.stopped { - t.wg.Done() - return + timeToSend := time.Now().Sub(startTime) + time.Sleep(time.Second - timeToSend) + logger.Log("event", fmt.Sprintf("sent %d transactions", t.Rate), "took", timeToSend) + case <-pingsTicker.C: + // Right now go-rpc server closes the connection in the absence of pings + c.SetWriteDeadline(time.Now().Add(sendTimeout)) + if err := c.WriteMessage(websocket.PingMessage, []byte{}); err != nil { + logger.Log("err", errors.Wrap(err, "failed to write ping message")) } + } - tx := generateTx(connIndex, num) - err := conn.WriteJSON(rpctypes.RPCRequest{ - JSONRPC: "2.0", - ID: "", - Method: "broadcast_tx_async", - Params: []interface{}{hex.EncodeToString(tx)}, - }) + if t.stopped { + // To cleanly close a connection, a client should send a close + // frame and wait for the server to close the connection. + c.SetWriteDeadline(time.Now().Add(sendTimeout)) + err := c.WriteMessage(websocket.CloseMessage, websocket.FormatCloseMessage(websocket.CloseNormalClosure, "")) if err != nil { - panic(errors.Wrap(err, fmt.Sprintf("lost connection to %s", conn.Address))) + logger.Log("err", errors.Wrap(err, "failed to write close message")) } - num++ - } - timeToSend := time.Now().Sub(startTime) - time.Sleep(time.Second - timeToSend) + return + } } } +func connect(host string) (*websocket.Conn, *http.Response, error) { + u := url.URL{Scheme: "ws", Host: host, Path: "/websocket"} + return websocket.DefaultDialer.Dial(u.String(), nil) +} + func generateTx(a int, b int) []byte { tx := make([]byte, 250) binary.PutUvarint(tx[:32], uint64(a)) binary.PutUvarint(tx[32:64], uint64(b)) if _, err := rand.Read(tx[234:]); err != nil { - panic(errors.Wrap(err, "err reading from crypto/rand")) + panic(errors.Wrap(err, "failed to generate transaction")) } return tx }