diff --git a/client/client.go b/client/client.go index ceec11433..19de9e000 100644 --- a/client/client.go +++ b/client/client.go @@ -7,6 +7,7 @@ import ( "fmt" "net" "sync" + "time" . "github.com/tendermint/go-common" "github.com/tendermint/tmsp/types" @@ -25,47 +26,70 @@ type Client struct { QuitService sync.Mutex // [EB]: is this even used? - reqQueue chan *ReqRes - flushTimer *ThrottleTimer + reqQueue chan *ReqRes + flushTimer *ThrottleTimer + mustConnect bool - mtx sync.Mutex - addr string - conn net.Conn - bufWriter *bufio.Writer - err error - reqSent *list.List - resCb func(*types.Request, *types.Response) // listens to all callbacks + mtx sync.Mutex + addr string + conn net.Conn + err error + reqSent *list.List + resCb func(*types.Request, *types.Response) // listens to all callbacks } -func NewClient(addr string) (*Client, error) { - conn, err := Connect(addr) - if err != nil { - return nil, err - } +func NewClient(addr string, mustConnect bool) (*Client, error) { cli := &Client{ - reqQueue: make(chan *ReqRes, reqQueueSize), - flushTimer: NewThrottleTimer("Client", flushThrottleMS), + reqQueue: make(chan *ReqRes, reqQueueSize), + flushTimer: NewThrottleTimer("Client", flushThrottleMS), + mustConnect: mustConnect, - conn: conn, - bufWriter: bufio.NewWriter(conn), - reqSent: list.New(), - resCb: nil, + addr: addr, + reqSent: list.New(), + resCb: nil, } cli.QuitService = *NewQuitService(nil, "Client", cli) - cli.Start() // Just start it, it's confusing for callers to remember to start. - return cli, nil + _, err := cli.Start() // Just start it, it's confusing for callers to remember to start. + if mustConnect { + return nil, err + } else { + return cli, nil + } } -func (cli *Client) OnStart() error { +func (cli *Client) OnStart() (err error) { cli.QuitService.OnStart() - go cli.sendRequestsRoutine() - go cli.recvResponseRoutine() - return nil + doneCh := make(chan struct{}) + go func() { + RETRY_LOOP: + for { + conn, err_ := Connect(cli.addr) + if err_ != nil { + if cli.mustConnect { + err = err_ // OnStart() will return this. + close(doneCh) + return + } else { + fmt.Printf("tmsp.Client failed to connect to %v. Retrying...\n", cli.addr) + time.Sleep(time.Second * 3) + continue RETRY_LOOP + } + } + go cli.sendRequestsRoutine(conn) + go cli.recvResponseRoutine(conn) + close(doneCh) // OnStart() will return no error. + return + } + }() + <-doneCh + return // err } func (cli *Client) OnStop() { cli.QuitService.OnStop() - cli.conn.Close() + if cli.conn != nil { + cli.conn.Close() + } } // Set listener for all responses @@ -78,7 +102,7 @@ func (cli *Client) SetResponseCallback(resCb Callback) { func (cli *Client) StopForError(err error) { cli.mtx.Lock() - // log.Error("Stopping Client for error.", "error", err) + fmt.Printf("Stopping tmsp.Client for error: %v\n", err.Error()) if cli.err == nil { cli.err = err } @@ -94,7 +118,8 @@ func (cli *Client) Error() error { //---------------------------------------- -func (cli *Client) sendRequestsRoutine() { +func (cli *Client) sendRequestsRoutine(conn net.Conn) { + w := bufio.NewWriter(conn) for { select { case <-cli.flushTimer.Ch: @@ -107,14 +132,14 @@ func (cli *Client) sendRequestsRoutine() { return case reqres := <-cli.reqQueue: cli.willSendReq(reqres) - err := types.WriteMessage(reqres.Request, cli.bufWriter) + err := types.WriteMessage(reqres.Request, w) if err != nil { cli.StopForError(err) return } // log.Debug("Sent request", "requestType", reflect.TypeOf(reqres.Request), "request", reqres.Request) if reqres.Request.Type == types.MessageType_Flush { - err = cli.bufWriter.Flush() + err = w.Flush() if err != nil { cli.StopForError(err) return @@ -124,8 +149,8 @@ func (cli *Client) sendRequestsRoutine() { } } -func (cli *Client) recvResponseRoutine() { - r := bufio.NewReader(cli.conn) // Buffer reads +func (cli *Client) recvResponseRoutine(conn net.Conn) { + r := bufio.NewReader(conn) // Buffer reads for { var res = &types.Response{} err := types.ReadMessage(r, res) diff --git a/tests/test_counter.go b/tests/test_counter.go index 91b23f390..6ed8f4f41 100644 --- a/tests/test_counter.go +++ b/tests/test_counter.go @@ -70,7 +70,7 @@ func startApp() *process.Process { func startClient() *tmspcli.Client { // Start client - client, err := tmspcli.NewClient("tcp://127.0.0.1:46658") + client, err := tmspcli.NewClient("tcp://127.0.0.1:46658", true) if err != nil { panic("connecting to counter_app: " + err.Error()) }