diff --git a/cmd/sim_txs/main.go b/cmd/sim_txs/main.go index 3211aa60d..1e6de7514 100644 --- a/cmd/sim_txs/main.go +++ b/cmd/sim_txs/main.go @@ -20,7 +20,7 @@ func parseFlags() (privKeyHex string, numAccounts int, remote string) { var version bool flag.StringVar(&privKeyHex, "priv-key", "", "Private key bytes in HEX") flag.IntVar(&numAccounts, "num-accounts", 1000, "Deterministically generates this many sub-accounts") - flag.StringVar(&remote, "remote", "http://localhost:46657", "Remote RPC host:port") + flag.StringVar(&remote, "remote", "localhost:46657", "Remote RPC host:port") flag.BoolVar(&version, "version", false, "Version") flag.Parse() if version { @@ -49,7 +49,7 @@ func main() { // Get root account. rootAccount, err := getAccount(remote, root.Address) if err != nil { - fmt.Println(Fmt("Root account does not exist: %X", root.Address)) + fmt.Println(Fmt("Root account %X does not exist: %v", root.Address, err)) return } else { fmt.Println("Root account", rootAccount) @@ -60,7 +60,7 @@ func main() { accounts[0] = rootAccount privAccounts := make([]*acm.PrivAccount, numAccounts+1) privAccounts[0] = root - for i := 1; i < numAccounts; i++ { + for i := 1; i < numAccounts+1; i++ { privAccounts[i] = root.Generate(i) account, err := getAccount(remote, privAccounts[i].Address) if err != nil { @@ -75,11 +75,12 @@ func main() { sendTx := makeRandomTransaction(10, rootAccount.Sequence+1, root, 2, accounts) fmt.Println(sendTx) - wsClient, err := rpcclient.NewWSClient("http://localhost:46657/websocket") + wsClient, err := rpcclient.NewWSClient("ws://" + remote + "/websocket") if err != nil { Exit(Fmt("Failed to establish websocket connection: %v", err)) } - wsClient.Subscribe(types.EventStringAccInput(sendTx.Outputs[0].Address)) + wsClient.Subscribe(types.EventStringAccInput(sendTx.Inputs[0].Address)) + wsClient.Start() go func() { for { @@ -88,6 +89,12 @@ func main() { } }() + err = broadcastSendTx(remote, sendTx) + if err != nil { + Exit(Fmt("Failed to broadcast SendTx: %v", err)) + return + } + /* go func() { for { @@ -112,7 +119,7 @@ func main() { func getAccount(remote string, address []byte) (*acm.Account, error) { // var account *acm.Account = new(acm.Account) - account, err := rpcclient.Call(remote, "get_account", []interface{}{address}, (*acm.Account)(nil)) + account, err := rpcclient.Call("http://"+remote, "get_account", []interface{}{address}, (*acm.Account)(nil)) if err != nil { return nil, err } @@ -124,7 +131,7 @@ func getAccount(remote string, address []byte) (*acm.Account, error) { } func broadcastSendTx(remote string, sendTx *types.SendTx) error { - receipt, err := rpcclient.Call(remote, "broadcast_tx", []interface{}{sendTx}, (*ctypes.Receipt)(nil)) + receipt, err := rpcclient.Call("http://"+remote, "broadcast_tx", []interface{}{sendTx}, (*ctypes.Receipt)(nil)) if err != nil { return err } @@ -138,6 +145,10 @@ func broadcastSendTx(remote string, sendTx *types.SendTx) error { // inputPriv: input privAccount func makeRandomTransaction(balance int64, sequence int, inputPriv *acm.PrivAccount, sendCount int, accounts []*acm.Account) *types.SendTx { + if sendCount >= len(accounts) { + PanicSanity("Cannot make tx with sendCount >= len(accounts)") + } + // Remember which accounts were chosen accMap := map[string]struct{}{} accMap[string(inputPriv.Address)] = struct{}{} diff --git a/rpc/client/ws_client.go b/rpc/client/ws_client.go index 3e81b1c55..6480ac96a 100644 --- a/rpc/client/ws_client.go +++ b/rpc/client/ws_client.go @@ -9,7 +9,6 @@ import ( . "github.com/tendermint/tendermint/common" _ "github.com/tendermint/tendermint/config/tendermint_test" "github.com/tendermint/tendermint/rpc/types" - "github.com/tendermint/tendermint/wire" ) const wsEventsChannelCapacity = 10 @@ -52,26 +51,21 @@ func (wsc *WSClient) receiveEventsRoutine() { for { _, data, err := wsc.ReadMessage() if err != nil { - log.Info("WSClient failed to read message: %v", err) + log.Info(Fmt("WSClient failed to read message: %v", err)) wsc.Stop() break } else { var response rpctypes.RPCResponse if err := json.Unmarshal(data, &response); err != nil { - log.Info("WSClient failed to parse message: %v", err) + log.Info(Fmt("WSClient failed to parse message: %v", err)) wsc.Stop() break } if strings.HasSuffix(response.Id, "#event") { - var eventResult rpctypes.RPCEventResult - var err error - wire.ReadJSONObject(&eventResult, response.Result, &err) - if err != nil { - log.Info("WSClient failed to parse RPCEventResult: %v", err) - wsc.Stop() - break - } - wsc.EventsCh <- eventResult + result := response.Result.(map[string]interface{}) + event := result["event"].(string) + data := result["data"] + wsc.EventsCh <- rpctypes.RPCEventResult{event, data} } else { wsc.ResponsesCh <- response } diff --git a/rpc/core_client/ws_client.go b/rpc/core_client/ws_client.go deleted file mode 100644 index c64636476..000000000 --- a/rpc/core_client/ws_client.go +++ /dev/null @@ -1,73 +0,0 @@ -package core_client - -import ( - "github.com/tendermint/tendermint/Godeps/_workspace/src/github.com/gorilla/websocket" - rpctypes "github.com/tendermint/tendermint/rpc/types" - "net/http" -) - -// A websocket client subscribes and unsubscribes to events -type WSClient struct { - host string - conn *websocket.Conn -} - -// create a new connection -func NewWSClient(addr string) *WSClient { - return &WSClient{ - host: addr, - } -} - -func (wsc *WSClient) Dial() (*http.Response, error) { - dialer := websocket.DefaultDialer - rHeader := http.Header{} - conn, r, err := dialer.Dial(wsc.host, rHeader) - if err != nil { - return r, err - } - wsc.conn = conn - return r, nil -} - -// subscribe to an event -func (wsc *WSClient) Subscribe(eventid string) error { - return wsc.conn.WriteJSON(rpctypes.RPCRequest{ - JSONRPC: "2.0", - Id: "", - Method: "subscribe", - Params: []interface{}{eventid}, - }) -} - -// unsubscribe from an event -func (wsc *WSClient) Unsubscribe(eventid string) error { - return wsc.conn.WriteJSON(rpctypes.RPCRequest{ - JSONRPC: "2.0", - Id: "", - Method: "unsubscribe", - Params: []interface{}{eventid}, - }) -} - -type WSMsg struct { - Data []byte - Error error -} - -// returns a channel from which messages can be pulled -// from a go routine that reads the socket. -// if the ws returns an error (eg. closes), we return -func (wsc *WSClient) Read() chan *WSMsg { - ch := make(chan *WSMsg) - go func() { - for { - _, p, err := wsc.conn.ReadMessage() - ch <- &WSMsg{p, err} - if err != nil { - return - } - } - }() - return ch -} diff --git a/rpc/server/handlers.go b/rpc/server/handlers.go index a372c5cb7..153b778fa 100644 --- a/rpc/server/handlers.go +++ b/rpc/server/handlers.go @@ -220,7 +220,7 @@ type WSConnection struct { baseConn *websocket.Conn writeChan chan RPCResponse readTimeout *time.Timer - pingTicker *time.Timer + pingTicker *time.Ticker funcMap map[string]*RPCFunc evsw *events.EventSwitch @@ -248,7 +248,7 @@ func (wsc *WSConnection) OnStart() { // Custom Ping handler to touch readTimeout wsc.readTimeout = time.NewTimer(time.Second * wsReadTimeoutSeconds) - wsc.pingTicker = time.NewTimer(time.Second * wsPingTickerSeconds) + wsc.pingTicker = time.NewTicker(time.Second * wsPingTickerSeconds) wsc.baseConn.SetPingHandler(func(m string) error { wsc.baseConn.WriteControl(websocket.PongMessage, []byte(m), time.Now().Add(time.Second*wsWriteTimeoutSeconds)) wsc.readTimeout.Reset(time.Second * wsReadTimeoutSeconds)