Browse Source

s/TMSPClient/Client/g

pull/1780/head
Jae Kwon 8 years ago
parent
commit
b2bd661a61
2 changed files with 36 additions and 36 deletions
  1. +30
    -30
      client/client.go
  2. +6
    -6
      tests/test_counter.go

+ 30
- 30
client/client.go View File

@ -21,7 +21,7 @@ type Callback func(*types.Request, *types.Response)
// This is goroutine-safe, but users should beware that // This is goroutine-safe, but users should beware that
// the application in general is not meant to be interfaced // the application in general is not meant to be interfaced
// with concurrent callers. // with concurrent callers.
type TMSPClient struct {
type Client struct {
QuitService QuitService
sync.Mutex // [EB]: is this even used? sync.Mutex // [EB]: is this even used?
@ -37,48 +37,48 @@ type TMSPClient struct {
resCb func(*types.Request, *types.Response) // listens to all callbacks resCb func(*types.Request, *types.Response) // listens to all callbacks
} }
func NewTMSPClient(addr string) (*TMSPClient, error) {
func NewClient(addr string) (*Client, error) {
conn, err := Connect(addr) conn, err := Connect(addr)
if err != nil { if err != nil {
return nil, err return nil, err
} }
cli := &TMSPClient{
cli := &Client{
reqQueue: make(chan *ReqRes, reqQueueSize), reqQueue: make(chan *ReqRes, reqQueueSize),
flushTimer: NewThrottleTimer("TMSPClient", flushThrottleMS),
flushTimer: NewThrottleTimer("Client", flushThrottleMS),
conn: conn, conn: conn,
bufWriter: bufio.NewWriter(conn), bufWriter: bufio.NewWriter(conn),
reqSent: list.New(), reqSent: list.New(),
resCb: nil, resCb: nil,
} }
cli.QuitService = *NewQuitService(nil, "TMSPClient", cli)
cli.QuitService = *NewQuitService(nil, "Client", cli)
cli.Start() // Just start it, it's confusing for callers to remember to start. cli.Start() // Just start it, it's confusing for callers to remember to start.
return cli, nil return cli, nil
} }
func (cli *TMSPClient) OnStart() error {
func (cli *Client) OnStart() error {
cli.QuitService.OnStart() cli.QuitService.OnStart()
go cli.sendRequestsRoutine() go cli.sendRequestsRoutine()
go cli.recvResponseRoutine() go cli.recvResponseRoutine()
return nil return nil
} }
func (cli *TMSPClient) OnStop() {
func (cli *Client) OnStop() {
cli.QuitService.OnStop() cli.QuitService.OnStop()
cli.conn.Close() cli.conn.Close()
} }
// Set listener for all responses // Set listener for all responses
// NOTE: callback may get internally generated flush responses. // NOTE: callback may get internally generated flush responses.
func (cli *TMSPClient) SetResponseCallback(resCb Callback) {
func (cli *Client) SetResponseCallback(resCb Callback) {
cli.mtx.Lock() cli.mtx.Lock()
defer cli.mtx.Unlock() defer cli.mtx.Unlock()
cli.resCb = resCb cli.resCb = resCb
} }
func (cli *TMSPClient) StopForError(err error) {
func (cli *Client) StopForError(err error) {
cli.mtx.Lock() cli.mtx.Lock()
// log.Error("Stopping TMSPClient for error.", "error", err)
// log.Error("Stopping Client for error.", "error", err)
if cli.err == nil { if cli.err == nil {
cli.err = err cli.err = err
} }
@ -86,7 +86,7 @@ func (cli *TMSPClient) StopForError(err error) {
cli.Stop() cli.Stop()
} }
func (cli *TMSPClient) Error() error {
func (cli *Client) Error() error {
cli.mtx.Lock() cli.mtx.Lock()
defer cli.mtx.Unlock() defer cli.mtx.Unlock()
return cli.err return cli.err
@ -94,7 +94,7 @@ func (cli *TMSPClient) Error() error {
//---------------------------------------- //----------------------------------------
func (cli *TMSPClient) sendRequestsRoutine() {
func (cli *Client) sendRequestsRoutine() {
for { for {
select { select {
case <-cli.flushTimer.Ch: case <-cli.flushTimer.Ch:
@ -124,7 +124,7 @@ func (cli *TMSPClient) sendRequestsRoutine() {
} }
} }
func (cli *TMSPClient) recvResponseRoutine() {
func (cli *Client) recvResponseRoutine() {
r := bufio.NewReader(cli.conn) // Buffer reads r := bufio.NewReader(cli.conn) // Buffer reads
for { for {
var res = &types.Response{} var res = &types.Response{}
@ -147,13 +147,13 @@ func (cli *TMSPClient) recvResponseRoutine() {
} }
} }
func (cli *TMSPClient) willSendReq(reqres *ReqRes) {
func (cli *Client) willSendReq(reqres *ReqRes) {
cli.mtx.Lock() cli.mtx.Lock()
defer cli.mtx.Unlock() defer cli.mtx.Unlock()
cli.reqSent.PushBack(reqres) cli.reqSent.PushBack(reqres)
} }
func (cli *TMSPClient) didRecvResponse(res *types.Response) error {
func (cli *Client) didRecvResponse(res *types.Response) error {
cli.mtx.Lock() cli.mtx.Lock()
defer cli.mtx.Unlock() defer cli.mtx.Unlock()
@ -187,42 +187,42 @@ func (cli *TMSPClient) didRecvResponse(res *types.Response) error {
//---------------------------------------- //----------------------------------------
func (cli *TMSPClient) EchoAsync(msg string) *ReqRes {
func (cli *Client) EchoAsync(msg string) *ReqRes {
return cli.queueRequest(types.RequestEcho(msg)) return cli.queueRequest(types.RequestEcho(msg))
} }
func (cli *TMSPClient) FlushAsync() *ReqRes {
func (cli *Client) FlushAsync() *ReqRes {
return cli.queueRequest(types.RequestFlush()) return cli.queueRequest(types.RequestFlush())
} }
func (cli *TMSPClient) SetOptionAsync(key string, value string) *ReqRes {
func (cli *Client) SetOptionAsync(key string, value string) *ReqRes {
return cli.queueRequest(types.RequestSetOption(key, value)) return cli.queueRequest(types.RequestSetOption(key, value))
} }
func (cli *TMSPClient) AppendTxAsync(tx []byte) *ReqRes {
func (cli *Client) AppendTxAsync(tx []byte) *ReqRes {
return cli.queueRequest(types.RequestAppendTx(tx)) return cli.queueRequest(types.RequestAppendTx(tx))
} }
func (cli *TMSPClient) CheckTxAsync(tx []byte) *ReqRes {
func (cli *Client) CheckTxAsync(tx []byte) *ReqRes {
return cli.queueRequest(types.RequestCheckTx(tx)) return cli.queueRequest(types.RequestCheckTx(tx))
} }
func (cli *TMSPClient) CommitAsync() *ReqRes {
func (cli *Client) CommitAsync() *ReqRes {
return cli.queueRequest(types.RequestCommit()) return cli.queueRequest(types.RequestCommit())
} }
func (cli *TMSPClient) QueryAsync(query []byte) *ReqRes {
func (cli *Client) QueryAsync(query []byte) *ReqRes {
return cli.queueRequest(types.RequestQuery(query)) return cli.queueRequest(types.RequestQuery(query))
} }
//---------------------------------------- //----------------------------------------
func (cli *TMSPClient) FlushSync() error {
func (cli *Client) FlushSync() error {
cli.queueRequest(types.RequestFlush()).Wait() cli.queueRequest(types.RequestFlush()).Wait()
return cli.err return cli.err
} }
func (cli *TMSPClient) InfoSync() (info string, err error) {
func (cli *Client) InfoSync() (info string, err error) {
reqres := cli.queueRequest(types.RequestInfo()) reqres := cli.queueRequest(types.RequestInfo())
cli.FlushSync() cli.FlushSync()
if cli.err != nil { if cli.err != nil {
@ -231,7 +231,7 @@ func (cli *TMSPClient) InfoSync() (info string, err error) {
return string(reqres.Response.Data), nil return string(reqres.Response.Data), nil
} }
func (cli *TMSPClient) SetOptionSync(key string, value string) (log string, err error) {
func (cli *Client) SetOptionSync(key string, value string) (log string, err error) {
reqres := cli.queueRequest(types.RequestSetOption(key, value)) reqres := cli.queueRequest(types.RequestSetOption(key, value))
cli.FlushSync() cli.FlushSync()
if cli.err != nil { if cli.err != nil {
@ -240,7 +240,7 @@ func (cli *TMSPClient) SetOptionSync(key string, value string) (log string, err
return reqres.Response.Log, nil return reqres.Response.Log, nil
} }
func (cli *TMSPClient) AppendTxSync(tx []byte) (code types.CodeType, result []byte, log string, err error) {
func (cli *Client) AppendTxSync(tx []byte) (code types.CodeType, result []byte, log string, err error) {
reqres := cli.queueRequest(types.RequestAppendTx(tx)) reqres := cli.queueRequest(types.RequestAppendTx(tx))
cli.FlushSync() cli.FlushSync()
if cli.err != nil { if cli.err != nil {
@ -250,7 +250,7 @@ func (cli *TMSPClient) AppendTxSync(tx []byte) (code types.CodeType, result []by
return res.Code, res.Data, res.Log, nil return res.Code, res.Data, res.Log, nil
} }
func (cli *TMSPClient) CheckTxSync(tx []byte) (code types.CodeType, result []byte, log string, err error) {
func (cli *Client) CheckTxSync(tx []byte) (code types.CodeType, result []byte, log string, err error) {
reqres := cli.queueRequest(types.RequestCheckTx(tx)) reqres := cli.queueRequest(types.RequestCheckTx(tx))
cli.FlushSync() cli.FlushSync()
if cli.err != nil { if cli.err != nil {
@ -260,7 +260,7 @@ func (cli *TMSPClient) CheckTxSync(tx []byte) (code types.CodeType, result []byt
return res.Code, res.Data, res.Log, nil return res.Code, res.Data, res.Log, nil
} }
func (cli *TMSPClient) CommitSync() (hash []byte, log string, err error) {
func (cli *Client) CommitSync() (hash []byte, log string, err error) {
reqres := cli.queueRequest(types.RequestCommit()) reqres := cli.queueRequest(types.RequestCommit())
cli.FlushSync() cli.FlushSync()
if cli.err != nil { if cli.err != nil {
@ -270,7 +270,7 @@ func (cli *TMSPClient) CommitSync() (hash []byte, log string, err error) {
return res.Data, res.Log, nil return res.Data, res.Log, nil
} }
func (cli *TMSPClient) QuerySync(query []byte) (code types.CodeType, result []byte, log string, err error) {
func (cli *Client) QuerySync(query []byte) (code types.CodeType, result []byte, log string, err error) {
reqres := cli.queueRequest(types.RequestQuery(query)) reqres := cli.queueRequest(types.RequestQuery(query))
cli.FlushSync() cli.FlushSync()
if cli.err != nil { if cli.err != nil {
@ -282,7 +282,7 @@ func (cli *TMSPClient) QuerySync(query []byte) (code types.CodeType, result []by
//---------------------------------------- //----------------------------------------
func (cli *TMSPClient) queueRequest(req *types.Request) *ReqRes {
func (cli *Client) queueRequest(req *types.Request) *ReqRes {
reqres := newReqRes(req) reqres := newReqRes(req)
// TODO: set cli.err if reqQueue times out // TODO: set cli.err if reqQueue times out
cli.reqQueue <- reqres cli.reqQueue <- reqres


+ 6
- 6
tests/test_counter.go View File

@ -68,23 +68,23 @@ func startApp() *process.Process {
return proc return proc
} }
func startClient() *tmspcli.TMSPClient {
func startClient() *tmspcli.Client {
// Start client // Start client
client, err := tmspcli.NewTMSPClient("tcp://127.0.0.1:46658")
client, err := tmspcli.NewClient("tcp://127.0.0.1:46658")
if err != nil { if err != nil {
panic("connecting to counter_app: " + err.Error()) panic("connecting to counter_app: " + err.Error())
} }
return client return client
} }
func setOption(client *tmspcli.TMSPClient, key, value string) {
func setOption(client *tmspcli.Client, key, value string) {
log, err := client.SetOptionSync(key, value) log, err := client.SetOptionSync(key, value)
if err != nil { if err != nil {
panic(Fmt("setting %v=%v: %v\nlog: %v", key, value, err, log)) panic(Fmt("setting %v=%v: %v\nlog: %v", key, value, err, log))
} }
} }
func commit(client *tmspcli.TMSPClient, hashExp []byte) {
func commit(client *tmspcli.Client, hashExp []byte) {
hash, log, err := client.CommitSync() hash, log, err := client.CommitSync()
if err != nil { if err != nil {
panic(Fmt("committing %v\nlog: %v", err, log)) panic(Fmt("committing %v\nlog: %v", err, log))
@ -95,7 +95,7 @@ func commit(client *tmspcli.TMSPClient, hashExp []byte) {
} }
} }
func appendTx(client *tmspcli.TMSPClient, txBytes []byte, codeExp types.CodeType, dataExp []byte) {
func appendTx(client *tmspcli.Client, txBytes []byte, codeExp types.CodeType, dataExp []byte) {
code, data, log, err := client.AppendTxSync(txBytes) code, data, log, err := client.AppendTxSync(txBytes)
if err != nil { if err != nil {
panic(Fmt("appending tx %X: %v\nlog: %v", txBytes, err, log)) panic(Fmt("appending tx %X: %v\nlog: %v", txBytes, err, log))
@ -110,7 +110,7 @@ func appendTx(client *tmspcli.TMSPClient, txBytes []byte, codeExp types.CodeType
} }
} }
func checkTx(client *tmspcli.TMSPClient, txBytes []byte, codeExp types.CodeType, dataExp []byte) {
func checkTx(client *tmspcli.Client, txBytes []byte, codeExp types.CodeType, dataExp []byte) {
code, data, log, err := client.CheckTxSync(txBytes) code, data, log, err := client.CheckTxSync(txBytes)
if err != nil { if err != nil {
panic(Fmt("checking tx %X: %v\nlog: %v", txBytes, err, log)) panic(Fmt("checking tx %X: %v\nlog: %v", txBytes, err, log))


Loading…
Cancel
Save