package main
|
|
|
|
import (
|
|
"crypto/sha256"
|
|
"encoding/binary"
|
|
"encoding/hex"
|
|
"encoding/json"
|
|
"fmt"
|
|
|
|
// it is ok to use math/rand here: we do not need a cryptographically secure random
|
|
// number generator here and we can run the tests a bit faster
|
|
"math/rand"
|
|
"net"
|
|
"net/http"
|
|
"net/url"
|
|
"os"
|
|
"sync"
|
|
"time"
|
|
|
|
"github.com/gorilla/websocket"
|
|
"github.com/pkg/errors"
|
|
|
|
"github.com/tendermint/tendermint/libs/log"
|
|
rpctypes "github.com/tendermint/tendermint/rpc/lib/types"
|
|
)
|
|
|
|
const (
|
|
sendTimeout = 10 * time.Second
|
|
// see https://github.com/tendermint/tendermint/blob/master/rpc/lib/server/handlers.go
|
|
pingPeriod = (30 * 9 / 10) * time.Second
|
|
)
|
|
|
|
type transacter struct {
|
|
Target string
|
|
Rate int
|
|
Size int
|
|
Connections int
|
|
BroadcastTxMethod string
|
|
|
|
conns []*websocket.Conn
|
|
connsBroken []bool
|
|
startingWg sync.WaitGroup
|
|
endingWg sync.WaitGroup
|
|
stopped bool
|
|
|
|
logger log.Logger
|
|
}
|
|
|
|
func newTransacter(target string, connections, rate int, size int, broadcastTxMethod string) *transacter {
|
|
return &transacter{
|
|
Target: target,
|
|
Rate: rate,
|
|
Size: size,
|
|
Connections: connections,
|
|
BroadcastTxMethod: broadcastTxMethod,
|
|
conns: make([]*websocket.Conn, connections),
|
|
connsBroken: make([]bool, connections),
|
|
logger: log.NewNopLogger(),
|
|
}
|
|
}
|
|
|
|
// SetLogger lets you set your own logger
|
|
func (t *transacter) SetLogger(l log.Logger) {
|
|
t.logger = l
|
|
}
|
|
|
|
// Start opens N = `t.Connections` connections to the target and creates read
|
|
// and write goroutines for each connection.
|
|
func (t *transacter) Start() error {
|
|
t.stopped = false
|
|
|
|
rand.Seed(time.Now().Unix())
|
|
|
|
for i := 0; i < t.Connections; i++ {
|
|
c, _, err := connect(t.Target)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
t.conns[i] = c
|
|
}
|
|
|
|
t.startingWg.Add(t.Connections)
|
|
t.endingWg.Add(2 * t.Connections)
|
|
for i := 0; i < t.Connections; i++ {
|
|
go t.sendLoop(i)
|
|
go t.receiveLoop(i)
|
|
}
|
|
|
|
t.startingWg.Wait()
|
|
|
|
return nil
|
|
}
|
|
|
|
// Stop closes the connections.
|
|
func (t *transacter) Stop() {
|
|
t.stopped = true
|
|
t.endingWg.Wait()
|
|
for _, c := range t.conns {
|
|
c.Close()
|
|
}
|
|
}
|
|
|
|
// receiveLoop reads messages from the connection (empty in case of
|
|
// `broadcast_tx_async`).
|
|
func (t *transacter) receiveLoop(connIndex int) {
|
|
c := t.conns[connIndex]
|
|
defer t.endingWg.Done()
|
|
for {
|
|
_, _, err := c.ReadMessage()
|
|
if err != nil {
|
|
if !websocket.IsCloseError(err, websocket.CloseNormalClosure) {
|
|
t.logger.Error(
|
|
fmt.Sprintf("failed to read response on conn %d", connIndex),
|
|
"err",
|
|
err,
|
|
)
|
|
}
|
|
return
|
|
}
|
|
if t.stopped || t.connsBroken[connIndex] {
|
|
return
|
|
}
|
|
}
|
|
}
|
|
|
|
// sendLoop generates transactions at a given rate.
|
|
func (t *transacter) sendLoop(connIndex int) {
|
|
started := false
|
|
// Close the starting waitgroup, in the event that this fails to start
|
|
defer func() {
|
|
if !started {
|
|
t.startingWg.Done()
|
|
}
|
|
}()
|
|
c := t.conns[connIndex]
|
|
|
|
c.SetPingHandler(func(message string) error {
|
|
err := c.WriteControl(websocket.PongMessage, []byte(message), time.Now().Add(sendTimeout))
|
|
if err == websocket.ErrCloseSent {
|
|
return nil
|
|
} else if e, ok := err.(net.Error); ok && e.Temporary() {
|
|
return nil
|
|
}
|
|
return err
|
|
})
|
|
|
|
logger := t.logger.With("addr", c.RemoteAddr())
|
|
|
|
var txNumber = 0
|
|
|
|
pingsTicker := time.NewTicker(pingPeriod)
|
|
txsTicker := time.NewTicker(1 * time.Second)
|
|
defer func() {
|
|
pingsTicker.Stop()
|
|
txsTicker.Stop()
|
|
t.endingWg.Done()
|
|
}()
|
|
|
|
// hash of the host name is a part of each tx
|
|
var hostnameHash [sha256.Size]byte
|
|
hostname, err := os.Hostname()
|
|
if err != nil {
|
|
hostname = "127.0.0.1"
|
|
}
|
|
hostnameHash = sha256.Sum256([]byte(hostname))
|
|
// each transaction embeds connection index, tx number and hash of the hostname
|
|
// we update the tx number between successive txs
|
|
tx := generateTx(connIndex, txNumber, t.Size, hostnameHash)
|
|
txHex := make([]byte, len(tx)*2)
|
|
hex.Encode(txHex, tx)
|
|
|
|
for {
|
|
select {
|
|
case <-txsTicker.C:
|
|
startTime := time.Now()
|
|
endTime := startTime.Add(time.Second)
|
|
numTxSent := t.Rate
|
|
if !started {
|
|
t.startingWg.Done()
|
|
started = true
|
|
}
|
|
|
|
now := time.Now()
|
|
for i := 0; i < t.Rate; i++ {
|
|
// update tx number of the tx, and the corresponding hex
|
|
updateTx(tx, txHex, txNumber)
|
|
paramsJSON, err := json.Marshal(map[string]interface{}{"tx": txHex})
|
|
if err != nil {
|
|
fmt.Printf("failed to encode params: %v\n", err)
|
|
os.Exit(1)
|
|
}
|
|
rawParamsJSON := json.RawMessage(paramsJSON)
|
|
|
|
c.SetWriteDeadline(now.Add(sendTimeout))
|
|
err = c.WriteJSON(rpctypes.RPCRequest{
|
|
JSONRPC: "2.0",
|
|
ID: rpctypes.JSONRPCStringID("tm-bench"),
|
|
Method: t.BroadcastTxMethod,
|
|
Params: rawParamsJSON,
|
|
})
|
|
if err != nil {
|
|
err = errors.Wrap(err,
|
|
fmt.Sprintf("txs send failed on connection #%d", connIndex))
|
|
t.connsBroken[connIndex] = true
|
|
logger.Error(err.Error())
|
|
return
|
|
}
|
|
|
|
// cache the time.Now() reads to save time.
|
|
if i%5 == 0 {
|
|
now = time.Now()
|
|
if now.After(endTime) {
|
|
// Plus one accounts for sending this tx
|
|
numTxSent = i + 1
|
|
break
|
|
}
|
|
}
|
|
|
|
txNumber++
|
|
}
|
|
|
|
timeToSend := time.Since(startTime)
|
|
logger.Info(fmt.Sprintf("sent %d transactions", numTxSent), "took", timeToSend)
|
|
if timeToSend < 1*time.Second {
|
|
sleepTime := time.Second - timeToSend
|
|
logger.Debug(fmt.Sprintf("connection #%d is sleeping for %f seconds", connIndex, sleepTime.Seconds()))
|
|
time.Sleep(sleepTime)
|
|
}
|
|
|
|
case <-pingsTicker.C:
|
|
// go-rpc server closes the connection in the absence of pings
|
|
c.SetWriteDeadline(time.Now().Add(sendTimeout))
|
|
if err := c.WriteMessage(websocket.PingMessage, []byte{}); err != nil {
|
|
err = errors.Wrap(err,
|
|
fmt.Sprintf("failed to write ping message on conn #%d", connIndex))
|
|
logger.Error(err.Error())
|
|
t.connsBroken[connIndex] = true
|
|
}
|
|
}
|
|
|
|
if t.stopped {
|
|
// To cleanly close a connection, a client should send a close
|
|
// frame and wait for the server to close the connection.
|
|
c.SetWriteDeadline(time.Now().Add(sendTimeout))
|
|
err := c.WriteMessage(websocket.CloseMessage, websocket.FormatCloseMessage(websocket.CloseNormalClosure, ""))
|
|
if err != nil {
|
|
err = errors.Wrap(err,
|
|
fmt.Sprintf("failed to write close message on conn #%d", connIndex))
|
|
logger.Error(err.Error())
|
|
t.connsBroken[connIndex] = true
|
|
}
|
|
|
|
return
|
|
}
|
|
}
|
|
}
|
|
|
|
func connect(host string) (*websocket.Conn, *http.Response, error) {
|
|
u := url.URL{Scheme: "ws", Host: host, Path: "/websocket"}
|
|
return websocket.DefaultDialer.Dial(u.String(), nil)
|
|
}
|
|
|
|
func generateTx(connIndex int, txNumber int, txSize int, hostnameHash [sha256.Size]byte) []byte {
|
|
tx := make([]byte, txSize)
|
|
|
|
binary.PutUvarint(tx[:8], uint64(connIndex))
|
|
binary.PutUvarint(tx[8:16], uint64(txNumber))
|
|
copy(tx[16:32], hostnameHash[:16])
|
|
binary.PutUvarint(tx[32:40], uint64(time.Now().Unix()))
|
|
|
|
// 40-* random data
|
|
if _, err := rand.Read(tx[40:]); err != nil { //nolint: gosec
|
|
panic(errors.Wrap(err, "failed to read random bytes"))
|
|
}
|
|
|
|
return tx
|
|
}
|
|
|
|
// warning, mutates input byte slice
|
|
func updateTx(tx []byte, txHex []byte, txNumber int) {
|
|
binary.PutUvarint(tx[8:16], uint64(txNumber))
|
|
hexUpdate := make([]byte, 16)
|
|
hex.Encode(hexUpdate, tx[8:16])
|
|
for i := 16; i < 32; i++ {
|
|
txHex[i] = hexUpdate[i-16]
|
|
}
|
|
}
|