diff --git a/tools/tm-bench/main.go b/tools/tm-bench/main.go index f8a979fef..bad49cba5 100644 --- a/tools/tm-bench/main.go +++ b/tools/tm-bench/main.go @@ -7,6 +7,7 @@ import ( "math" "os" "strings" + "sync" "text/tabwriter" "time" @@ -247,20 +248,21 @@ func startTransacters( ) []*transacter { transacters := make([]*transacter, len(endpoints)) + wg := sync.WaitGroup{} + wg.Add(len(endpoints)) for i, e := range endpoints { t := newTransacter(e, connections, txsRate, txSize, broadcastTxMethod) t.SetLogger(logger) - if err := t.Start(); err != nil { - fmt.Fprintln(os.Stderr, err) - os.Exit(1) - } - transacters[i] = t - } - - // Wait until all transacters have started firing txs - for _, t := range transacters { - t.WaitUntilAllConnectionsStartedFiringTxs() + go func(i int) { + defer wg.Done() + if err := t.Start(); err != nil { + fmt.Fprintln(os.Stderr, err) + os.Exit(1) + } + transacters[i] = t + }(i) } + wg.Wait() return transacters } diff --git a/tools/tm-bench/transacter.go b/tools/tm-bench/transacter.go index b11c174fa..2834727b5 100644 --- a/tools/tm-bench/transacter.go +++ b/tools/tm-bench/transacter.go @@ -34,11 +34,11 @@ type transacter struct { Connections int BroadcastTxMethod string - conns []*websocket.Conn - connsStarted []bool - connsBroken []bool - wg sync.WaitGroup - stopped bool + conns []*websocket.Conn + connsBroken []bool + startingWg sync.WaitGroup + endingWg sync.WaitGroup + stopped bool logger log.Logger } @@ -51,7 +51,6 @@ func newTransacter(target string, connections, rate int, size int, broadcastTxMe Connections: connections, BroadcastTxMethod: broadcastTxMethod, conns: make([]*websocket.Conn, connections), - connsStarted: make([]bool, connections), connsBroken: make([]bool, connections), logger: log.NewNopLogger(), } @@ -77,35 +76,22 @@ func (t *transacter) Start() error { t.conns[i] = c } - t.wg.Add(2 * t.Connections) + t.startingWg.Add(t.Connections) + t.endingWg.Add(2 * t.Connections) for i := 0; i < t.Connections; i++ { go t.sendLoop(i) go t.receiveLoop(i) } - return nil -} + t.startingWg.Wait() -// WaitUntilAllConnectionsStartedFiringTxs waits until all of this -// transacters connections have begun sending txs at the specified rate -func (t *transacter) WaitUntilAllConnectionsStartedFiringTxs() { - for { - started := true - for i := 0; i < t.Connections; i++ { - if !t.connsStarted[i] && !t.connsBroken[i] { - started = false - } - } - if started { - break - } - } + return nil } // Stop closes the connections. func (t *transacter) Stop() { t.stopped = true - t.wg.Wait() + t.endingWg.Wait() for _, c := range t.conns { c.Close() } @@ -115,7 +101,7 @@ func (t *transacter) Stop() { // `broadcast_tx_async`). func (t *transacter) receiveLoop(connIndex int) { c := t.conns[connIndex] - defer t.wg.Done() + defer t.endingWg.Done() for { _, _, err := c.ReadMessage() if err != nil { @@ -136,6 +122,13 @@ func (t *transacter) receiveLoop(connIndex int) { // sendLoop generates transactions at a given rate. func (t *transacter) sendLoop(connIndex int) { + started := false + // Close the starting waitgroup, in the event that this fails to start + defer func() { + if !started { + t.startingWg.Done() + } + }() c := t.conns[connIndex] c.SetPingHandler(func(message string) error { @@ -157,7 +150,7 @@ func (t *transacter) sendLoop(connIndex int) { defer func() { pingsTicker.Stop() txsTicker.Stop() - t.wg.Done() + t.endingWg.Done() }() // hash of the host name is a part of each tx @@ -174,7 +167,10 @@ func (t *transacter) sendLoop(connIndex int) { startTime := time.Now() endTime := startTime.Add(time.Second) numTxSent := t.Rate - t.connsStarted[connIndex] = true + if !started { + t.startingWg.Done() + started = true + } for i := 0; i < t.Rate; i++ { // each transaction embeds connection index, tx number and hash of the hostname