@ -1,6 +1,7 @@
package main
package main
import (
import (
"container/ring"
"context"
"context"
"crypto/rand"
"crypto/rand"
"errors"
"errors"
@ -93,34 +94,64 @@ func loadGenerate(ctx context.Context, chTx chan<- types.Tx, multiplier int, siz
// loadProcess processes transactions
// loadProcess processes transactions
func loadProcess ( ctx context . Context , testnet * e2e . Testnet , chTx <- chan types . Tx , chSuccess chan <- types . Tx ) {
func loadProcess ( ctx context . Context , testnet * e2e . Testnet , chTx <- chan types . Tx , chSuccess chan <- types . Tx ) {
// Each worker gets its own client to each node, which allows for some
// concurrency while still bounding it.
clients := map [ string ] * rpchttp . HTTP { }
// Each worker gets its own client to each usable node, which
// allows for some concurrency while still bounding it.
clients := make ( [ ] * rpchttp . HTTP , 0 , len ( testnet . Nodes ) )
for idx := range testnet . Nodes {
// Construct a list of usable nodes for the creating
// load. Don't send load through seed nodes because
// they do not provide the RPC endpoints required to
// broadcast transaction.
if testnet . Nodes [ idx ] . Mode == e2e . ModeSeed {
continue
}
client , err := testnet . Nodes [ idx ] . Client ( )
if err != nil {
continue
}
clients = append ( clients , client )
}
if len ( clients ) == 0 {
panic ( "no clients to process load" )
}
// Put the clients in a ring so they can be used in a
// round-robin fashion.
clientRing := ring . New ( len ( clients ) )
for idx := range clients {
clientRing . Value = clients [ idx ]
clientRing = clientRing . Next ( )
}
var err error
var err error
for tx := range chTx {
node := testnet . RandomNode ( )
client , ok := clients [ node . Name ]
if ! ok {
client , err = node . Client ( )
if err != nil {
for {
select {
case <- ctx . Done ( ) :
return
case tx := <- chTx :
clientRing = clientRing . Next ( )
client := clientRing . Value . ( * rpchttp . HTTP )
if _ , err := client . Health ( ctx ) ; err != nil {
continue
continue
}
}
// check that the node is up
_ , err = client . Health ( ctx )
if err != nil {
if _ , err = client . BroadcastTxSync ( ctx , tx ) ; err != nil {
continue
continue
}
}
clients [ node . Name ] = client
}
select {
case chSuccess <- tx :
continue
case <- ctx . Done ( ) :
return
}
if _ , err = client . BroadcastTxSync ( ctx , tx ) ; err != nil {
continue
}
}
chSuccess <- tx
}
}
}
}