diff --git a/abci/client/client.go b/abci/client/client.go index c5c1ab219..0bfe7d497 100644 --- a/abci/client/client.go +++ b/abci/client/client.go @@ -81,7 +81,7 @@ type ReqRes struct { *sync.WaitGroup *types.Response // Not set atomically, so be sure to use WaitGroup. - mtx tmsync.Mutex + mtx tmsync.RWMutex done bool // Gets set to true once *after* WaitGroup.Done(). cb func(*types.Response) // A single callback that may be set. } @@ -131,16 +131,16 @@ func (r *ReqRes) InvokeCallback() { // // ref: https://github.com/tendermint/tendermint/issues/5439 func (r *ReqRes) GetCallback() func(*types.Response) { - r.mtx.Lock() - defer r.mtx.Unlock() + r.mtx.RLock() + defer r.mtx.RUnlock() return r.cb } // SetDone marks the ReqRes object as done. func (r *ReqRes) SetDone() { r.mtx.Lock() + defer r.mtx.Unlock() r.done = true - r.mtx.Unlock() } func waitGroup1() (wg *sync.WaitGroup) { diff --git a/abci/client/grpc_client.go b/abci/client/grpc_client.go index b375c6cc9..71f460d2b 100644 --- a/abci/client/grpc_client.go +++ b/abci/client/grpc_client.go @@ -27,7 +27,7 @@ type grpcClient struct { conn *grpc.ClientConn chReqRes chan *ReqRes // dispatches "async" responses to callbacks *in order*, needed by mempool - mtx tmsync.Mutex + mtx tmsync.RWMutex addr string err error resCb func(*types.Request, *types.Response) // listens to all callbacks @@ -146,8 +146,8 @@ func (cli *grpcClient) StopForError(err error) { } func (cli *grpcClient) Error() error { - cli.mtx.Lock() - defer cli.mtx.Unlock() + cli.mtx.RLock() + defer cli.mtx.RUnlock() return cli.err } @@ -155,8 +155,8 @@ func (cli *grpcClient) Error() error { // NOTE: callback may get internally generated flush responses. func (cli *grpcClient) SetResponseCallback(resCb Callback) { cli.mtx.Lock() + defer cli.mtx.Unlock() cli.resCb = resCb - cli.mtx.Unlock() } //---------------------------------------- diff --git a/abci/client/local_client.go b/abci/client/local_client.go index b3bdc451d..09e644641 100644 --- a/abci/client/local_client.go +++ b/abci/client/local_client.go @@ -15,27 +15,35 @@ var _ Client = (*localClient)(nil) type localClient struct { service.BaseService - mtx *tmsync.Mutex + mtx *tmsync.RWMutex types.Application Callback } -func NewLocalClient(mtx *tmsync.Mutex, app types.Application) Client { +var _ Client = (*localClient)(nil) + +// NewLocalClient creates a local client, which will be directly calling the +// methods of the given app. +// +// Both Async and Sync methods ignore the given context.Context parameter. +func NewLocalClient(mtx *tmsync.RWMutex, app types.Application) Client { if mtx == nil { - mtx = new(tmsync.Mutex) + mtx = &tmsync.RWMutex{} } + cli := &localClient{ mtx: mtx, Application: app, } + cli.BaseService = *service.NewBaseService(nil, "localClient", cli) return cli } func (app *localClient) SetResponseCallback(cb Callback) { app.mtx.Lock() + defer app.mtx.Unlock() app.Callback = cb - app.mtx.Unlock() } // TODO: change types.Application to include Error()? @@ -59,8 +67,8 @@ func (app *localClient) EchoAsync(msg string) *ReqRes { } func (app *localClient) InfoAsync(req types.RequestInfo) *ReqRes { - app.mtx.Lock() - defer app.mtx.Unlock() + app.mtx.RLock() + defer app.mtx.RUnlock() res := app.Application.Info(req) return app.callback( @@ -103,8 +111,8 @@ func (app *localClient) CheckTxAsync(req types.RequestCheckTx) *ReqRes { } func (app *localClient) QueryAsync(req types.RequestQuery) *ReqRes { - app.mtx.Lock() - defer app.mtx.Unlock() + app.mtx.RLock() + defer app.mtx.RUnlock() res := app.Application.Query(req) return app.callback( @@ -212,8 +220,8 @@ func (app *localClient) EchoSync(msg string) (*types.ResponseEcho, error) { } func (app *localClient) InfoSync(req types.RequestInfo) (*types.ResponseInfo, error) { - app.mtx.Lock() - defer app.mtx.Unlock() + app.mtx.RLock() + defer app.mtx.RUnlock() res := app.Application.Info(req) return &res, nil @@ -244,8 +252,8 @@ func (app *localClient) CheckTxSync(req types.RequestCheckTx) (*types.ResponseCh } func (app *localClient) QuerySync(req types.RequestQuery) (*types.ResponseQuery, error) { - app.mtx.Lock() - defer app.mtx.Unlock() + app.mtx.RLock() + defer app.mtx.RUnlock() res := app.Application.Query(req) return &res, nil diff --git a/abci/client/socket_client.go b/abci/client/socket_client.go index a369f878c..fd32315a6 100644 --- a/abci/client/socket_client.go +++ b/abci/client/socket_client.go @@ -34,7 +34,7 @@ type socketClient struct { reqQueue chan *ReqRes flushTimer *timer.ThrottleTimer - mtx tmsync.Mutex + mtx tmsync.RWMutex err error reqSent *list.List // list of requests sent, waiting for response resCb func(*types.Request, *types.Response) // called on all requests, if set. @@ -99,8 +99,8 @@ func (cli *socketClient) OnStop() { // Error returns an error if the client was stopped abruptly. func (cli *socketClient) Error() error { - cli.mtx.Lock() - defer cli.mtx.Unlock() + cli.mtx.RLock() + defer cli.mtx.RUnlock() return cli.err } @@ -110,8 +110,8 @@ func (cli *socketClient) Error() error { // NOTE: callback may get internally generated flush responses. func (cli *socketClient) SetResponseCallback(resCb Callback) { cli.mtx.Lock() + defer cli.mtx.Unlock() cli.resCb = resCb - cli.mtx.Unlock() } //---------------------------------------- diff --git a/consensus/byzantine_test.go b/consensus/byzantine_test.go index f5e372ae2..7df700416 100644 --- a/consensus/byzantine_test.go +++ b/consensus/byzantine_test.go @@ -59,7 +59,7 @@ func TestByzantinePrevoteEquivocation(t *testing.T) { blockStore := store.NewBlockStore(blockDB) // one for mempool, one for consensus - mtx := new(tmsync.Mutex) + mtx := new(tmsync.RWMutex) proxyAppConnMem := abcicli.NewLocalClient(mtx, app) proxyAppConnCon := abcicli.NewLocalClient(mtx, app) diff --git a/consensus/common_test.go b/consensus/common_test.go index 60bb9eed5..62bc89633 100644 --- a/consensus/common_test.go +++ b/consensus/common_test.go @@ -389,7 +389,7 @@ func newStateWithConfigAndBlockStore( blockStore := store.NewBlockStore(blockDB) // one for mempool, one for consensus - mtx := new(tmsync.Mutex) + mtx := new(tmsync.RWMutex) proxyAppConnMem := abcicli.NewLocalClient(mtx, app) proxyAppConnCon := abcicli.NewLocalClient(mtx, app) diff --git a/consensus/reactor_test.go b/consensus/reactor_test.go index f23ec727d..39a730d13 100644 --- a/consensus/reactor_test.go +++ b/consensus/reactor_test.go @@ -153,7 +153,7 @@ func TestReactorWithEvidence(t *testing.T) { blockStore := store.NewBlockStore(blockDB) // one for mempool, one for consensus - mtx := new(tmsync.Mutex) + mtx := new(tmsync.RWMutex) proxyAppConnMem := abcicli.NewLocalClient(mtx, app) proxyAppConnCon := abcicli.NewLocalClient(mtx, app) diff --git a/proxy/client.go b/proxy/client.go index 1dc6d8853..27baa9738 100644 --- a/proxy/client.go +++ b/proxy/client.go @@ -20,7 +20,7 @@ type ClientCreator interface { // local proxy uses a mutex on an in-proc app type localClientCreator struct { - mtx *tmsync.Mutex + mtx *tmsync.RWMutex app types.Application } @@ -28,7 +28,7 @@ type localClientCreator struct { // which will be running locally. func NewLocalClientCreator(app types.Application) ClientCreator { return &localClientCreator{ - mtx: new(tmsync.Mutex), + mtx: new(tmsync.RWMutex), app: app, } }