You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

298 lines
7.4 KiB

7 years ago
7 years ago
7 years ago
7 years ago
  1. package main
  2. import (
  3. "encoding/json"
  4. "flag"
  5. "fmt"
  6. "math"
  7. "os"
  8. "strings"
  9. "text/tabwriter"
  10. "time"
  11. "github.com/go-kit/kit/log/term"
  12. metrics "github.com/rcrowley/go-metrics"
  13. "github.com/tendermint/tendermint/libs/log"
  14. tmrpc "github.com/tendermint/tendermint/rpc/client"
  15. )
  16. var logger = log.NewNopLogger()
  17. type statistics struct {
  18. TxsThroughput metrics.Histogram `json:"txs_per_sec"`
  19. BlocksThroughput metrics.Histogram `json:"blocks_per_sec"`
  20. }
  21. func main() {
  22. var duration, txsRate, connections, txSize int
  23. var verbose bool
  24. var outputFormat, broadcastTxMethod string
  25. flagSet := flag.NewFlagSet("tm-bench", flag.ExitOnError)
  26. flagSet.IntVar(&connections, "c", 1, "Connections to keep open per endpoint")
  27. flagSet.IntVar(&duration, "T", 10, "Exit after the specified amount of time in seconds")
  28. flagSet.IntVar(&txsRate, "r", 1000, "Txs per second to send in a connection")
  29. flagSet.IntVar(&txSize, "s", 250, "The size of a transaction in bytes.")
  30. flagSet.StringVar(&outputFormat, "output-format", "plain", "Output format: plain or json")
  31. flagSet.StringVar(&broadcastTxMethod, "broadcast-tx-method", "async", "Broadcast method: async (no guarantees; fastest), sync (ensures tx is checked) or commit (ensures tx is checked and committed; slowest)")
  32. flagSet.BoolVar(&verbose, "v", false, "Verbose output")
  33. flagSet.Usage = func() {
  34. fmt.Println(`Tendermint blockchain benchmarking tool.
  35. Usage:
  36. tm-bench [-c 1] [-T 10] [-r 1000] [-s 250] [endpoints] [-output-format <plain|json> [-broadcast-tx-method <async|sync|commit>]]
  37. Examples:
  38. tm-bench localhost:26657`)
  39. fmt.Println("Flags:")
  40. flagSet.PrintDefaults()
  41. }
  42. flagSet.Parse(os.Args[1:])
  43. if flagSet.NArg() == 0 {
  44. flagSet.Usage()
  45. os.Exit(1)
  46. }
  47. if verbose {
  48. if outputFormat == "json" {
  49. fmt.Fprintln(os.Stderr, "Verbose mode not supported with json output.")
  50. os.Exit(1)
  51. }
  52. // Color errors red
  53. colorFn := func(keyvals ...interface{}) term.FgBgColor {
  54. for i := 1; i < len(keyvals); i += 2 {
  55. if _, ok := keyvals[i].(error); ok {
  56. return term.FgBgColor{Fg: term.White, Bg: term.Red}
  57. }
  58. }
  59. return term.FgBgColor{}
  60. }
  61. logger = log.NewTMLoggerWithColorFn(log.NewSyncWriter(os.Stdout), colorFn)
  62. fmt.Printf("Running %ds test @ %s\n", duration, flagSet.Arg(0))
  63. }
  64. if broadcastTxMethod != "async" &&
  65. broadcastTxMethod != "sync" &&
  66. broadcastTxMethod != "commit" {
  67. fmt.Fprintln(
  68. os.Stderr,
  69. "broadcast-tx-method should be either 'sync', 'async' or 'commit'.",
  70. )
  71. os.Exit(1)
  72. }
  73. var (
  74. endpoints = strings.Split(flagSet.Arg(0), ",")
  75. client = tmrpc.NewHTTP(endpoints[0], "/websocket")
  76. initialHeight = latestBlockHeight(client)
  77. )
  78. logger.Info("Latest block height", "h", initialHeight)
  79. transacters := startTransacters(
  80. endpoints,
  81. connections,
  82. txsRate,
  83. txSize,
  84. "broadcast_tx_"+broadcastTxMethod,
  85. )
  86. // record time start
  87. timeStart := time.Now()
  88. logger.Info("Time last transacter started", "t", timeStart)
  89. endTime := time.Duration(duration) * time.Second
  90. <-time.After(endTime)
  91. for i, t := range transacters {
  92. t.Stop()
  93. numCrashes := countCrashes(t.connsBroken)
  94. if numCrashes != 0 {
  95. fmt.Printf("%d connections crashed on transacter #%d\n", numCrashes, i)
  96. }
  97. }
  98. timeStop := time.Now()
  99. logger.Info("Time stopped", "t", timeStop)
  100. stats, err := calculateStatistics(
  101. client,
  102. initialHeight,
  103. timeStart,
  104. timeStop,
  105. duration,
  106. )
  107. if err != nil {
  108. fmt.Fprintln(os.Stderr, err)
  109. os.Exit(1)
  110. }
  111. printStatistics(stats, outputFormat)
  112. }
  113. func latestBlockHeight(client tmrpc.Client) int64 {
  114. status, err := client.Status()
  115. if err != nil {
  116. fmt.Fprintln(os.Stderr, err)
  117. os.Exit(1)
  118. }
  119. return status.SyncInfo.LatestBlockHeight
  120. }
  121. func countCrashes(crashes []bool) int {
  122. count := 0
  123. for i := 0; i < len(crashes); i++ {
  124. if crashes[i] {
  125. count++
  126. }
  127. }
  128. return count
  129. }
  130. // calculateStatistics calculates the tx / second, and blocks / second based
  131. // off of the number the transactions and number of blocks that occurred from
  132. // the start block, and the end time.
  133. func calculateStatistics(
  134. client tmrpc.Client,
  135. minHeight int64,
  136. timeStart, timeStop time.Time,
  137. duration int,
  138. ) (*statistics, error) {
  139. stats := &statistics{
  140. BlocksThroughput: metrics.NewHistogram(metrics.NewUniformSample(1000)),
  141. TxsThroughput: metrics.NewHistogram(metrics.NewUniformSample(1000)),
  142. }
  143. // get blocks between minHeight and last height
  144. // This returns max(minHeight,(last_height - 20)) to last_height
  145. info, err := client.BlockchainInfo(minHeight, 0)
  146. if err != nil {
  147. return nil, err
  148. }
  149. var (
  150. blockMetas = info.BlockMetas
  151. lastHeight = info.LastHeight
  152. diff = lastHeight - minHeight
  153. offset = len(blockMetas)
  154. )
  155. for offset < int(diff) {
  156. // get blocks between minHeight and last height
  157. info, err := client.BlockchainInfo(minHeight, lastHeight-int64(offset))
  158. if err != nil {
  159. return nil, err
  160. }
  161. blockMetas = append(blockMetas, info.BlockMetas...)
  162. offset = len(blockMetas)
  163. }
  164. var (
  165. numBlocksPerSec = make(map[int64]int64)
  166. numTxsPerSec = make(map[int64]int64)
  167. )
  168. // because during some seconds blocks won't be created...
  169. for i := int64(0); i < int64(duration); i++ {
  170. numBlocksPerSec[i] = 0
  171. numTxsPerSec[i] = 0
  172. }
  173. // iterates from max height to min height
  174. for i, blockMeta := range blockMetas {
  175. // check if block was created after timeStart
  176. if blockMeta.Header.Time.Before(timeStart) {
  177. break
  178. }
  179. // check if block was created before timeStop
  180. if blockMeta.Header.Time.After(timeStop) {
  181. continue
  182. }
  183. sec := secondsSinceTimeStart(timeStart, blockMeta.Header.Time)
  184. // increase number of blocks for that second
  185. numBlocksPerSec[sec]++
  186. // increase number of txs for that second
  187. numTxsPerSec[sec] += blockMeta.Header.NumTxs
  188. logger.Debug(fmt.Sprintf("%d txs in block %d, height %d", blockMeta.Header.NumTxs, i, blockMeta.Header.Height))
  189. }
  190. for _, n := range numBlocksPerSec {
  191. stats.BlocksThroughput.Update(n)
  192. }
  193. for _, n := range numTxsPerSec {
  194. stats.TxsThroughput.Update(n)
  195. }
  196. return stats, nil
  197. }
  198. func secondsSinceTimeStart(timeStart, timePassed time.Time) int64 {
  199. return int64(math.Round(timePassed.Sub(timeStart).Seconds()))
  200. }
  201. func startTransacters(
  202. endpoints []string,
  203. connections,
  204. txsRate int,
  205. txSize int,
  206. broadcastTxMethod string,
  207. ) []*transacter {
  208. transacters := make([]*transacter, len(endpoints))
  209. for i, e := range endpoints {
  210. t := newTransacter(e, connections, txsRate, txSize, broadcastTxMethod)
  211. t.SetLogger(logger)
  212. if err := t.Start(); err != nil {
  213. fmt.Fprintln(os.Stderr, err)
  214. os.Exit(1)
  215. }
  216. transacters[i] = t
  217. }
  218. return transacters
  219. }
  220. func printStatistics(stats *statistics, outputFormat string) {
  221. if outputFormat == "json" {
  222. result, err := json.Marshal(struct {
  223. TxsThroughput float64 `json:"txs_per_sec_avg"`
  224. BlocksThroughput float64 `json:"blocks_per_sec_avg"`
  225. }{stats.TxsThroughput.Mean(), stats.BlocksThroughput.Mean()})
  226. if err != nil {
  227. fmt.Fprintln(os.Stderr, err)
  228. os.Exit(1)
  229. }
  230. fmt.Println(string(result))
  231. } else {
  232. w := tabwriter.NewWriter(os.Stdout, 0, 0, 5, ' ', 0)
  233. fmt.Fprintln(w, "Stats\tAvg\tStdDev\tMax\tTotal\t")
  234. fmt.Fprintln(
  235. w,
  236. fmt.Sprintf(
  237. "Txs/sec\t%.0f\t%.0f\t%d\t%d\t",
  238. stats.TxsThroughput.Mean(),
  239. stats.TxsThroughput.StdDev(),
  240. stats.TxsThroughput.Max(),
  241. stats.TxsThroughput.Sum(),
  242. ),
  243. )
  244. fmt.Fprintln(
  245. w,
  246. fmt.Sprintf("Blocks/sec\t%.3f\t%.3f\t%d\t%d\t",
  247. stats.BlocksThroughput.Mean(),
  248. stats.BlocksThroughput.StdDev(),
  249. stats.BlocksThroughput.Max(),
  250. stats.BlocksThroughput.Sum(),
  251. ),
  252. )
  253. w.Flush()
  254. }
  255. }