Browse Source

tmbench: Make it more resilient to WSConn breaking (#111)

* tmbench: Make it more resilient to WSConn breaking

This commit changes the behavior of a broken connection from calling
os.Exit, to instead killing that connection. This also improves
the debug logging, by specifying connection index within errors.

* Rename connStatus to connsBroken

* change logging level
pull/1943/head
Dev Ojha 6 years ago
committed by Anton Kaliaev
parent
commit
e8127456ca
3 changed files with 44 additions and 13 deletions
  1. +1
    -1
      tm-bench/README.md
  2. +15
    -1
      tm-bench/main.go
  3. +28
    -11
      tm-bench/transacter.go

+ 1
- 1
tm-bench/README.md View File

@ -61,7 +61,7 @@ through the number of transactions. If its too slow, the loop stops at one secon
If its too fast, we wait until the one second mark ends. The transactions per If its too fast, we wait until the one second mark ends. The transactions per
second stat is computed based off of what ends up in the block. second stat is computed based off of what ends up in the block.
Each of the connections is running via a separate goroutine.
Each of the connections is handled via two separate goroutines.
## Development ## Development


+ 15
- 1
tm-bench/main.go View File

@ -108,8 +108,12 @@ Examples:
endTime := time.Duration(duration) * time.Second endTime := time.Duration(duration) * time.Second
select { select {
case <-time.After(endTime): case <-time.After(endTime):
for _, t := range transacters {
for i, t := range transacters {
t.Stop() t.Stop()
numCrashes := countCrashes(t.connsBroken)
if numCrashes != 0 {
fmt.Printf("%d connections crashed on transacter #%d\n", numCrashes, i)
}
} }
timeStop := time.Now() timeStop := time.Now()
@ -142,6 +146,16 @@ func latestBlockHeight(client tmrpc.Client) int64 {
return status.SyncInfo.LatestBlockHeight return status.SyncInfo.LatestBlockHeight
} }
func countCrashes(crashes []bool) int {
count := 0
for i := 0; i < len(crashes); i++ {
if crashes[i] {
count++
}
}
return count
}
// calculateStatistics calculates the tx / second, and blocks / second based // calculateStatistics calculates the tx / second, and blocks / second based
// off of the number the transactions and number of blocks that occurred from // off of the number the transactions and number of blocks that occurred from
// the start block, and the end time. // the start block, and the end time.


+ 28
- 11
tm-bench/transacter.go View File

@ -34,9 +34,10 @@ type transacter struct {
Connections int Connections int
BroadcastTxMethod string BroadcastTxMethod string
conns []*websocket.Conn
wg sync.WaitGroup
stopped bool
conns []*websocket.Conn
connsBroken []bool
wg sync.WaitGroup
stopped bool
logger log.Logger logger log.Logger
} }
@ -49,6 +50,7 @@ func newTransacter(target string, connections, rate int, size int, broadcastTxMe
Connections: connections, Connections: connections,
BroadcastTxMethod: broadcastTxMethod, BroadcastTxMethod: broadcastTxMethod,
conns: make([]*websocket.Conn, connections), conns: make([]*websocket.Conn, connections),
connsBroken: make([]bool, connections),
logger: log.NewNopLogger(), logger: log.NewNopLogger(),
} }
} }
@ -100,11 +102,15 @@ func (t *transacter) receiveLoop(connIndex int) {
_, _, err := c.ReadMessage() _, _, err := c.ReadMessage()
if err != nil { if err != nil {
if !websocket.IsCloseError(err, websocket.CloseNormalClosure) { if !websocket.IsCloseError(err, websocket.CloseNormalClosure) {
t.logger.Error("failed to read response", "err", err)
t.logger.Error(
fmt.Sprintf("failed to read response on conn %d", connIndex),
"err",
err,
)
} }
return return
} }
if t.stopped {
if t.stopped || t.connsBroken[connIndex] {
return return
} }
} }
@ -169,8 +175,11 @@ func (t *transacter) sendLoop(connIndex int) {
Params: rawParamsJSON, Params: rawParamsJSON,
}) })
if err != nil { if err != nil {
fmt.Fprintf(os.Stderr, "%v. Try reducing the connections count and increasing the rate.\n", errors.Wrap(err, "txs send failed"))
os.Exit(1)
err = errors.Wrap(err,
fmt.Sprintf("txs send failed on connection #%d", connIndex))
t.connsBroken[connIndex] = true
logger.Error(err.Error())
return
} }
// Time added here is 7.13 ns/op, not significant enough to worry about // Time added here is 7.13 ns/op, not significant enough to worry about
@ -186,16 +195,21 @@ func (t *transacter) sendLoop(connIndex int) {
} }
timeToSend := time.Now().Sub(startTime) timeToSend := time.Now().Sub(startTime)
logger.Info(fmt.Sprintf("sent %d transactions", numTxSent), "took", timeToSend)
if timeToSend < 1*time.Second { if timeToSend < 1*time.Second {
time.Sleep(time.Second - timeToSend)
sleepTime := time.Second - timeToSend
logger.Debug(fmt.Sprintf("connection #%d is sleeping for %f seconds", connIndex, sleepTime.Seconds()))
time.Sleep(sleepTime)
} }
logger.Info(fmt.Sprintf("sent %d transactions", numTxSent), "took", timeToSend)
case <-pingsTicker.C: case <-pingsTicker.C:
// go-rpc server closes the connection in the absence of pings // go-rpc server closes the connection in the absence of pings
c.SetWriteDeadline(time.Now().Add(sendTimeout)) c.SetWriteDeadline(time.Now().Add(sendTimeout))
if err := c.WriteMessage(websocket.PingMessage, []byte{}); err != nil { if err := c.WriteMessage(websocket.PingMessage, []byte{}); err != nil {
logger.Error("failed to write ping message", "err", err)
err = errors.Wrap(err,
fmt.Sprintf("failed to write ping message on conn #%d", connIndex))
logger.Error(err.Error())
t.connsBroken[connIndex] = true
} }
} }
@ -205,7 +219,10 @@ func (t *transacter) sendLoop(connIndex int) {
c.SetWriteDeadline(time.Now().Add(sendTimeout)) c.SetWriteDeadline(time.Now().Add(sendTimeout))
err := c.WriteMessage(websocket.CloseMessage, websocket.FormatCloseMessage(websocket.CloseNormalClosure, "")) err := c.WriteMessage(websocket.CloseMessage, websocket.FormatCloseMessage(websocket.CloseNormalClosure, ""))
if err != nil { if err != nil {
logger.Error("failed to write close message", "err", err)
err = errors.Wrap(err,
fmt.Sprintf("failed to write close message on conn #%d", connIndex))
logger.Error(err.Error())
t.connsBroken[connIndex] = true
} }
return return


Loading…
Cancel
Save