From e8127456ca7cd187ddcc63c0129f31d5aac3824f Mon Sep 17 00:00:00 2001 From: Dev Ojha Date: Mon, 2 Jul 2018 03:19:51 -0700 Subject: [PATCH] tmbench: Make it more resilient to WSConn breaking (#111) * tmbench: Make it more resilient to WSConn breaking This commit changes the behavior of a broken connection from calling os.Exit, to instead killing that connection. This also improves the debug logging, by specifying connection index within errors. * Rename connStatus to connsBroken * change logging level --- tm-bench/README.md | 2 +- tm-bench/main.go | 16 +++++++++++++++- tm-bench/transacter.go | 39 ++++++++++++++++++++++++++++----------- 3 files changed, 44 insertions(+), 13 deletions(-) diff --git a/tm-bench/README.md b/tm-bench/README.md index 9a12a68c3..811141629 100644 --- a/tm-bench/README.md +++ b/tm-bench/README.md @@ -61,7 +61,7 @@ through the number of transactions. If its too slow, the loop stops at one secon If its too fast, we wait until the one second mark ends. The transactions per second stat is computed based off of what ends up in the block. -Each of the connections is running via a separate goroutine. +Each of the connections is handled via two separate goroutines. ## Development diff --git a/tm-bench/main.go b/tm-bench/main.go index d7a6dc940..87a176f95 100644 --- a/tm-bench/main.go +++ b/tm-bench/main.go @@ -108,8 +108,12 @@ Examples: endTime := time.Duration(duration) * time.Second select { case <-time.After(endTime): - for _, t := range transacters { + for i, t := range transacters { t.Stop() + numCrashes := countCrashes(t.connsBroken) + if numCrashes != 0 { + fmt.Printf("%d connections crashed on transacter #%d\n", numCrashes, i) + } } timeStop := time.Now() @@ -142,6 +146,16 @@ func latestBlockHeight(client tmrpc.Client) int64 { return status.SyncInfo.LatestBlockHeight } +func countCrashes(crashes []bool) int { + count := 0 + for i := 0; i < len(crashes); i++ { + if crashes[i] { + count++ + } + } + return count +} + // calculateStatistics calculates the tx / second, and blocks / second based // off of the number the transactions and number of blocks that occurred from // the start block, and the end time. diff --git a/tm-bench/transacter.go b/tm-bench/transacter.go index caff026b2..d3a43774d 100644 --- a/tm-bench/transacter.go +++ b/tm-bench/transacter.go @@ -34,9 +34,10 @@ type transacter struct { Connections int BroadcastTxMethod string - conns []*websocket.Conn - wg sync.WaitGroup - stopped bool + conns []*websocket.Conn + connsBroken []bool + wg sync.WaitGroup + stopped bool logger log.Logger } @@ -49,6 +50,7 @@ func newTransacter(target string, connections, rate int, size int, broadcastTxMe Connections: connections, BroadcastTxMethod: broadcastTxMethod, conns: make([]*websocket.Conn, connections), + connsBroken: make([]bool, connections), logger: log.NewNopLogger(), } } @@ -100,11 +102,15 @@ func (t *transacter) receiveLoop(connIndex int) { _, _, err := c.ReadMessage() if err != nil { if !websocket.IsCloseError(err, websocket.CloseNormalClosure) { - t.logger.Error("failed to read response", "err", err) + t.logger.Error( + fmt.Sprintf("failed to read response on conn %d", connIndex), + "err", + err, + ) } return } - if t.stopped { + if t.stopped || t.connsBroken[connIndex] { return } } @@ -169,8 +175,11 @@ func (t *transacter) sendLoop(connIndex int) { Params: rawParamsJSON, }) if err != nil { - fmt.Fprintf(os.Stderr, "%v. Try reducing the connections count and increasing the rate.\n", errors.Wrap(err, "txs send failed")) - os.Exit(1) + err = errors.Wrap(err, + fmt.Sprintf("txs send failed on connection #%d", connIndex)) + t.connsBroken[connIndex] = true + logger.Error(err.Error()) + return } // Time added here is 7.13 ns/op, not significant enough to worry about @@ -186,16 +195,21 @@ func (t *transacter) sendLoop(connIndex int) { } timeToSend := time.Now().Sub(startTime) + logger.Info(fmt.Sprintf("sent %d transactions", numTxSent), "took", timeToSend) if timeToSend < 1*time.Second { - time.Sleep(time.Second - timeToSend) + sleepTime := time.Second - timeToSend + logger.Debug(fmt.Sprintf("connection #%d is sleeping for %f seconds", connIndex, sleepTime.Seconds())) + time.Sleep(sleepTime) } - logger.Info(fmt.Sprintf("sent %d transactions", numTxSent), "took", timeToSend) case <-pingsTicker.C: // go-rpc server closes the connection in the absence of pings c.SetWriteDeadline(time.Now().Add(sendTimeout)) if err := c.WriteMessage(websocket.PingMessage, []byte{}); err != nil { - logger.Error("failed to write ping message", "err", err) + err = errors.Wrap(err, + fmt.Sprintf("failed to write ping message on conn #%d", connIndex)) + logger.Error(err.Error()) + t.connsBroken[connIndex] = true } } @@ -205,7 +219,10 @@ func (t *transacter) sendLoop(connIndex int) { c.SetWriteDeadline(time.Now().Add(sendTimeout)) err := c.WriteMessage(websocket.CloseMessage, websocket.FormatCloseMessage(websocket.CloseNormalClosure, "")) if err != nil { - logger.Error("failed to write close message", "err", err) + err = errors.Wrap(err, + fmt.Sprintf("failed to write close message on conn #%d", connIndex)) + logger.Error(err.Error()) + t.connsBroken[connIndex] = true } return