|
|
@ -5,101 +5,177 @@ import ( |
|
|
|
"encoding/hex" |
|
|
|
"fmt" |
|
|
|
"math/rand" |
|
|
|
"net/http" |
|
|
|
"net/url" |
|
|
|
"os" |
|
|
|
"sync" |
|
|
|
"time" |
|
|
|
|
|
|
|
"github.com/go-kit/kit/log" |
|
|
|
"github.com/gorilla/websocket" |
|
|
|
"github.com/pkg/errors" |
|
|
|
|
|
|
|
rpcclient "github.com/tendermint/go-rpc/client" |
|
|
|
rpctypes "github.com/tendermint/go-rpc/types" |
|
|
|
) |
|
|
|
|
|
|
|
const ( |
|
|
|
sendTimeout = 500 * time.Millisecond |
|
|
|
// see https://github.com/tendermint/go-rpc/blob/develop/server/handlers.go#L313
|
|
|
|
pingPeriod = (30 * 9 / 10) * time.Second |
|
|
|
) |
|
|
|
|
|
|
|
type transacter struct { |
|
|
|
Target string |
|
|
|
Rate int |
|
|
|
Connections int |
|
|
|
|
|
|
|
conns []*rpcclient.WSClient |
|
|
|
conns []*websocket.Conn |
|
|
|
wg sync.WaitGroup |
|
|
|
stopped bool |
|
|
|
|
|
|
|
logger log.Logger |
|
|
|
} |
|
|
|
|
|
|
|
func newTransacter(target string, connections int, rate int) *transacter { |
|
|
|
conns := make([]*rpcclient.WSClient, connections) |
|
|
|
for i := 0; i < connections; i++ { |
|
|
|
conns[i] = rpcclient.NewWSClient(target, "/websocket") |
|
|
|
} |
|
|
|
|
|
|
|
return &transacter{ |
|
|
|
Target: target, |
|
|
|
Rate: rate, |
|
|
|
Connections: connections, |
|
|
|
conns: conns, |
|
|
|
conns: make([]*websocket.Conn, connections), |
|
|
|
logger: log.NewNopLogger(), |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
// SetLogger lets you set your own logger
|
|
|
|
func (t *transacter) SetLogger(l log.Logger) { |
|
|
|
t.logger = l |
|
|
|
} |
|
|
|
|
|
|
|
// Start opens N = `t.Connections` connections to the target and creates read
|
|
|
|
// and write goroutines for each connection.
|
|
|
|
func (t *transacter) Start() error { |
|
|
|
t.stopped = false |
|
|
|
|
|
|
|
for _, c := range t.conns { |
|
|
|
if _, err := c.Start(); err != nil { |
|
|
|
for i := 0; i < t.Connections; i++ { |
|
|
|
c, _, err := connect(t.Target) |
|
|
|
if err != nil { |
|
|
|
return err |
|
|
|
} |
|
|
|
t.conns[i] = c |
|
|
|
} |
|
|
|
|
|
|
|
t.wg.Add(2 * t.Connections) |
|
|
|
for i := 0; i < t.Connections; i++ { |
|
|
|
t.wg.Add(1) |
|
|
|
go t.sendLoop(i) |
|
|
|
go t.receiveLoop(i) |
|
|
|
} |
|
|
|
|
|
|
|
return nil |
|
|
|
} |
|
|
|
|
|
|
|
// Stop closes the connections.
|
|
|
|
func (t *transacter) Stop() { |
|
|
|
t.stopped = true |
|
|
|
t.wg.Wait() |
|
|
|
for _, c := range t.conns { |
|
|
|
c.Stop() |
|
|
|
c.Close() |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
// receiveLoop reads messages from the connection (empty in case of
|
|
|
|
// `broadcast_tx_async`).
|
|
|
|
func (t *transacter) receiveLoop(connIndex int) { |
|
|
|
c := t.conns[connIndex] |
|
|
|
defer t.wg.Done() |
|
|
|
for { |
|
|
|
_, _, err := c.ReadMessage() |
|
|
|
if err != nil { |
|
|
|
if websocket.IsUnexpectedCloseError(err, websocket.CloseNormalClosure) { |
|
|
|
t.logger.Log("err", errors.Wrap(err, "failed to read response")) |
|
|
|
} |
|
|
|
return |
|
|
|
} |
|
|
|
if t.stopped { |
|
|
|
return |
|
|
|
} |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
// sendLoop generates transactions at a given rate.
|
|
|
|
func (t *transacter) sendLoop(connIndex int) { |
|
|
|
conn := t.conns[connIndex] |
|
|
|
c := t.conns[connIndex] |
|
|
|
logger := log.With(t.logger, "addr", c.RemoteAddr()) |
|
|
|
|
|
|
|
var txNumber = 0 |
|
|
|
|
|
|
|
pingsTicker := time.NewTicker(pingPeriod) |
|
|
|
txsTicker := time.NewTicker(1 * time.Second) |
|
|
|
defer func() { |
|
|
|
pingsTicker.Stop() |
|
|
|
txsTicker.Stop() |
|
|
|
t.wg.Done() |
|
|
|
}() |
|
|
|
|
|
|
|
var num = 0 |
|
|
|
for { |
|
|
|
startTime := time.Now() |
|
|
|
select { |
|
|
|
case <-txsTicker.C: |
|
|
|
startTime := time.Now() |
|
|
|
|
|
|
|
for i := 0; i < t.Rate; i++ { |
|
|
|
// each transaction embeds connection index and tx number
|
|
|
|
tx := generateTx(connIndex, txNumber) |
|
|
|
|
|
|
|
c.SetWriteDeadline(time.Now().Add(sendTimeout)) |
|
|
|
err := c.WriteJSON(rpctypes.RPCRequest{ |
|
|
|
JSONRPC: "2.0", |
|
|
|
ID: "", |
|
|
|
Method: "broadcast_tx_async", |
|
|
|
Params: []interface{}{hex.EncodeToString(tx)}, |
|
|
|
}) |
|
|
|
if err != nil { |
|
|
|
fmt.Printf("%v. Try increasing the connections count and reducing the rate.\n", errors.Wrap(err, "txs send failed")) |
|
|
|
os.Exit(1) |
|
|
|
} |
|
|
|
|
|
|
|
txNumber++ |
|
|
|
} |
|
|
|
|
|
|
|
for i := 0; i < t.Rate; i++ { |
|
|
|
if t.stopped { |
|
|
|
t.wg.Done() |
|
|
|
return |
|
|
|
timeToSend := time.Now().Sub(startTime) |
|
|
|
time.Sleep(time.Second - timeToSend) |
|
|
|
logger.Log("event", fmt.Sprintf("sent %d transactions", t.Rate), "took", timeToSend) |
|
|
|
case <-pingsTicker.C: |
|
|
|
// Right now go-rpc server closes the connection in the absence of pings
|
|
|
|
c.SetWriteDeadline(time.Now().Add(sendTimeout)) |
|
|
|
if err := c.WriteMessage(websocket.PingMessage, []byte{}); err != nil { |
|
|
|
logger.Log("err", errors.Wrap(err, "failed to write ping message")) |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
tx := generateTx(connIndex, num) |
|
|
|
err := conn.WriteJSON(rpctypes.RPCRequest{ |
|
|
|
JSONRPC: "2.0", |
|
|
|
ID: "", |
|
|
|
Method: "broadcast_tx_async", |
|
|
|
Params: []interface{}{hex.EncodeToString(tx)}, |
|
|
|
}) |
|
|
|
if t.stopped { |
|
|
|
// To cleanly close a connection, a client should send a close
|
|
|
|
// frame and wait for the server to close the connection.
|
|
|
|
c.SetWriteDeadline(time.Now().Add(sendTimeout)) |
|
|
|
err := c.WriteMessage(websocket.CloseMessage, websocket.FormatCloseMessage(websocket.CloseNormalClosure, "")) |
|
|
|
if err != nil { |
|
|
|
panic(errors.Wrap(err, fmt.Sprintf("lost connection to %s", conn.Address))) |
|
|
|
logger.Log("err", errors.Wrap(err, "failed to write close message")) |
|
|
|
} |
|
|
|
num++ |
|
|
|
} |
|
|
|
|
|
|
|
timeToSend := time.Now().Sub(startTime) |
|
|
|
time.Sleep(time.Second - timeToSend) |
|
|
|
return |
|
|
|
} |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
func connect(host string) (*websocket.Conn, *http.Response, error) { |
|
|
|
u := url.URL{Scheme: "ws", Host: host, Path: "/websocket"} |
|
|
|
return websocket.DefaultDialer.Dial(u.String(), nil) |
|
|
|
} |
|
|
|
|
|
|
|
func generateTx(a int, b int) []byte { |
|
|
|
tx := make([]byte, 250) |
|
|
|
binary.PutUvarint(tx[:32], uint64(a)) |
|
|
|
binary.PutUvarint(tx[32:64], uint64(b)) |
|
|
|
if _, err := rand.Read(tx[234:]); err != nil { |
|
|
|
panic(errors.Wrap(err, "err reading from crypto/rand")) |
|
|
|
panic(errors.Wrap(err, "failed to generate transaction")) |
|
|
|
} |
|
|
|
return tx |
|
|
|
} |