From 1c4dbe30d40bb4c1dc59a2247d5113b78702a014 Mon Sep 17 00:00:00 2001 From: Sam Kleinman Date: Sat, 3 Apr 2021 14:25:15 -0400 Subject: [PATCH] abci: change client to use multi-reader mutexes (#6306) --- abci/client/client.go | 8 ++++---- abci/client/grpc_client.go | 8 ++++---- abci/client/local_client.go | 26 ++++++++++++++------------ abci/client/socket_client.go | 8 ++++---- consensus/byzantine_test.go | 2 +- consensus/common_test.go | 2 +- consensus/reactor_test.go | 2 +- proxy/client.go | 4 ++-- 8 files changed, 31 insertions(+), 29 deletions(-) diff --git a/abci/client/client.go b/abci/client/client.go index 4cc5dafff..bcba3bec3 100644 --- a/abci/client/client.go +++ b/abci/client/client.go @@ -87,7 +87,7 @@ type ReqRes struct { *sync.WaitGroup *types.Response // Not set atomically, so be sure to use WaitGroup. - mtx tmsync.Mutex + mtx tmsync.RWMutex done bool // Gets set to true once *after* WaitGroup.Done(). cb func(*types.Response) // A single callback that may be set. } @@ -137,16 +137,16 @@ func (r *ReqRes) InvokeCallback() { // // ref: https://github.com/tendermint/tendermint/issues/5439 func (r *ReqRes) GetCallback() func(*types.Response) { - r.mtx.Lock() - defer r.mtx.Unlock() + r.mtx.RLock() + defer r.mtx.RUnlock() return r.cb } // SetDone marks the ReqRes object as done. func (r *ReqRes) SetDone() { r.mtx.Lock() + defer r.mtx.Unlock() r.done = true - r.mtx.Unlock() } func waitGroup1() (wg *sync.WaitGroup) { diff --git a/abci/client/grpc_client.go b/abci/client/grpc_client.go index b10eebd9e..18d9cb168 100644 --- a/abci/client/grpc_client.go +++ b/abci/client/grpc_client.go @@ -24,7 +24,7 @@ type grpcClient struct { conn *grpc.ClientConn chReqRes chan *ReqRes // dispatches "async" responses to callbacks *in order*, needed by mempool - mtx tmsync.Mutex + mtx tmsync.RWMutex addr string err error resCb func(*types.Request, *types.Response) // listens to all callbacks @@ -149,8 +149,8 @@ func (cli *grpcClient) StopForError(err error) { } func (cli *grpcClient) Error() error { - cli.mtx.Lock() - defer cli.mtx.Unlock() + cli.mtx.RLock() + defer cli.mtx.RUnlock() return cli.err } @@ -158,8 +158,8 @@ func (cli *grpcClient) Error() error { // NOTE: callback may get internally generated flush responses. func (cli *grpcClient) SetResponseCallback(resCb Callback) { cli.mtx.Lock() + defer cli.mtx.Unlock() cli.resCb = resCb - cli.mtx.Unlock() } //---------------------------------------- diff --git a/abci/client/local_client.go b/abci/client/local_client.go index 92ca0804f..d25a98b4d 100644 --- a/abci/client/local_client.go +++ b/abci/client/local_client.go @@ -15,7 +15,7 @@ import ( type localClient struct { service.BaseService - mtx *tmsync.Mutex + mtx *tmsync.RWMutex types.Application Callback } @@ -26,22 +26,24 @@ var _ Client = (*localClient)(nil) // methods of the given app. // // Both Async and Sync methods ignore the given context.Context parameter. -func NewLocalClient(mtx *tmsync.Mutex, app types.Application) Client { +func NewLocalClient(mtx *tmsync.RWMutex, app types.Application) Client { if mtx == nil { - mtx = new(tmsync.Mutex) + mtx = &tmsync.RWMutex{} } + cli := &localClient{ mtx: mtx, Application: app, } + cli.BaseService = *service.NewBaseService(nil, "localClient", cli) return cli } func (app *localClient) SetResponseCallback(cb Callback) { app.mtx.Lock() + defer app.mtx.Unlock() app.Callback = cb - app.mtx.Unlock() } // TODO: change types.Application to include Error()? @@ -65,8 +67,8 @@ func (app *localClient) EchoAsync(ctx context.Context, msg string) (*ReqRes, err } func (app *localClient) InfoAsync(ctx context.Context, req types.RequestInfo) (*ReqRes, error) { - app.mtx.Lock() - defer app.mtx.Unlock() + app.mtx.RLock() + defer app.mtx.RUnlock() res := app.Application.Info(req) return app.callback( @@ -98,8 +100,8 @@ func (app *localClient) CheckTxAsync(ctx context.Context, req types.RequestCheck } func (app *localClient) QueryAsync(ctx context.Context, req types.RequestQuery) (*ReqRes, error) { - app.mtx.Lock() - defer app.mtx.Unlock() + app.mtx.RLock() + defer app.mtx.RUnlock() res := app.Application.Query(req) return app.callback( @@ -213,8 +215,8 @@ func (app *localClient) EchoSync(ctx context.Context, msg string) (*types.Respon } func (app *localClient) InfoSync(ctx context.Context, req types.RequestInfo) (*types.ResponseInfo, error) { - app.mtx.Lock() - defer app.mtx.Unlock() + app.mtx.RLock() + defer app.mtx.RUnlock() res := app.Application.Info(req) return &res, nil @@ -247,8 +249,8 @@ func (app *localClient) QuerySync( ctx context.Context, req types.RequestQuery, ) (*types.ResponseQuery, error) { - app.mtx.Lock() - defer app.mtx.Unlock() + app.mtx.RLock() + defer app.mtx.RUnlock() res := app.Application.Query(req) return &res, nil diff --git a/abci/client/socket_client.go b/abci/client/socket_client.go index 0569f4c6b..3e4034902 100644 --- a/abci/client/socket_client.go +++ b/abci/client/socket_client.go @@ -43,7 +43,7 @@ type socketClient struct { reqQueue chan *reqResWithContext flushTimer *timer.ThrottleTimer - mtx tmsync.Mutex + mtx tmsync.RWMutex err error reqSent *list.List // list of requests sent, waiting for response resCb func(*types.Request, *types.Response) // called on all requests, if set. @@ -108,8 +108,8 @@ func (cli *socketClient) OnStop() { // Error returns an error if the client was stopped abruptly. func (cli *socketClient) Error() error { - cli.mtx.Lock() - defer cli.mtx.Unlock() + cli.mtx.RLock() + defer cli.mtx.RUnlock() return cli.err } @@ -119,8 +119,8 @@ func (cli *socketClient) Error() error { // NOTE: callback may get internally generated flush responses. func (cli *socketClient) SetResponseCallback(resCb Callback) { cli.mtx.Lock() + defer cli.mtx.Unlock() cli.resCb = resCb - cli.mtx.Unlock() } //---------------------------------------- diff --git a/consensus/byzantine_test.go b/consensus/byzantine_test.go index f0b65ee57..46b752d32 100644 --- a/consensus/byzantine_test.go +++ b/consensus/byzantine_test.go @@ -58,7 +58,7 @@ func TestByzantinePrevoteEquivocation(t *testing.T) { blockStore := store.NewBlockStore(blockDB) // one for mempool, one for consensus - mtx := new(tmsync.Mutex) + mtx := new(tmsync.RWMutex) proxyAppConnMem := abcicli.NewLocalClient(mtx, app) proxyAppConnCon := abcicli.NewLocalClient(mtx, app) diff --git a/consensus/common_test.go b/consensus/common_test.go index bd844982c..953339c20 100644 --- a/consensus/common_test.go +++ b/consensus/common_test.go @@ -392,7 +392,7 @@ func newStateWithConfigAndBlockStore( blockStore := store.NewBlockStore(blockDB) // one for mempool, one for consensus - mtx := new(tmsync.Mutex) + mtx := new(tmsync.RWMutex) proxyAppConnMem := abcicli.NewLocalClient(mtx, app) proxyAppConnCon := abcicli.NewLocalClient(mtx, app) diff --git a/consensus/reactor_test.go b/consensus/reactor_test.go index a53561cff..3d14533c5 100644 --- a/consensus/reactor_test.go +++ b/consensus/reactor_test.go @@ -303,7 +303,7 @@ func TestReactorWithEvidence(t *testing.T) { blockStore := store.NewBlockStore(blockDB) // one for mempool, one for consensus - mtx := new(tmsync.Mutex) + mtx := new(tmsync.RWMutex) proxyAppConnMem := abcicli.NewLocalClient(mtx, app) proxyAppConnCon := abcicli.NewLocalClient(mtx, app) diff --git a/proxy/client.go b/proxy/client.go index 1dc6d8853..27baa9738 100644 --- a/proxy/client.go +++ b/proxy/client.go @@ -20,7 +20,7 @@ type ClientCreator interface { // local proxy uses a mutex on an in-proc app type localClientCreator struct { - mtx *tmsync.Mutex + mtx *tmsync.RWMutex app types.Application } @@ -28,7 +28,7 @@ type localClientCreator struct { // which will be running locally. func NewLocalClientCreator(app types.Application) ClientCreator { return &localClientCreator{ - mtx: new(tmsync.Mutex), + mtx: new(tmsync.RWMutex), app: app, } }