diff --git a/abci/client/client.go b/abci/client/client.go index 93f8b9293..a38c7f81b 100644 --- a/abci/client/client.go +++ b/abci/client/client.go @@ -87,7 +87,7 @@ type ReqRes struct { *sync.WaitGroup *types.Response // Not set atomically, so be sure to use WaitGroup. - mtx tmsync.RWMutex + mtx tmsync.Mutex done bool // Gets set to true once *after* WaitGroup.Done(). cb func(*types.Response) // A single callback that may be set. } @@ -137,16 +137,16 @@ func (r *ReqRes) InvokeCallback() { // // ref: https://github.com/tendermint/tendermint/issues/5439 func (r *ReqRes) GetCallback() func(*types.Response) { - r.mtx.RLock() - defer r.mtx.RUnlock() + r.mtx.Lock() + defer r.mtx.Unlock() return r.cb } // SetDone marks the ReqRes object as done. func (r *ReqRes) SetDone() { r.mtx.Lock() - defer r.mtx.Unlock() r.done = true + r.mtx.Unlock() } func waitGroup1() (wg *sync.WaitGroup) { diff --git a/abci/client/creators.go b/abci/client/creators.go index 5a71becaf..e17b15eca 100644 --- a/abci/client/creators.go +++ b/abci/client/creators.go @@ -13,7 +13,7 @@ type Creator func() (Client, error) // NewLocalCreator returns a Creator for the given app, // which will be running locally. func NewLocalCreator(app types.Application) Creator { - mtx := new(tmsync.RWMutex) + mtx := new(tmsync.Mutex) return func() (Client, error) { return NewLocalClient(mtx, app), nil diff --git a/abci/client/grpc_client.go b/abci/client/grpc_client.go index 2bfa047bd..f1123fab5 100644 --- a/abci/client/grpc_client.go +++ b/abci/client/grpc_client.go @@ -24,7 +24,7 @@ type grpcClient struct { conn *grpc.ClientConn chReqRes chan *ReqRes // dispatches "async" responses to callbacks *in order*, needed by mempool - mtx tmsync.RWMutex + mtx tmsync.Mutex addr string err error resCb func(*types.Request, *types.Response) // listens to all callbacks @@ -149,8 +149,8 @@ func (cli *grpcClient) StopForError(err error) { } func (cli *grpcClient) Error() error { - cli.mtx.RLock() - defer cli.mtx.RUnlock() + cli.mtx.Lock() + defer cli.mtx.Unlock() return cli.err } @@ -158,8 +158,8 @@ func (cli *grpcClient) Error() error { // NOTE: callback may get internally generated flush responses. func (cli *grpcClient) SetResponseCallback(resCb Callback) { cli.mtx.Lock() - defer cli.mtx.Unlock() cli.resCb = resCb + cli.mtx.Unlock() } //---------------------------------------- diff --git a/abci/client/local_client.go b/abci/client/local_client.go index 06ff8171c..701108a3c 100644 --- a/abci/client/local_client.go +++ b/abci/client/local_client.go @@ -15,7 +15,7 @@ import ( type localClient struct { service.BaseService - mtx *tmsync.RWMutex + mtx *tmsync.Mutex types.Application Callback } @@ -26,24 +26,22 @@ var _ Client = (*localClient)(nil) // methods of the given app. // // Both Async and Sync methods ignore the given context.Context parameter. -func NewLocalClient(mtx *tmsync.RWMutex, app types.Application) Client { +func NewLocalClient(mtx *tmsync.Mutex, app types.Application) Client { if mtx == nil { - mtx = &tmsync.RWMutex{} + mtx = new(tmsync.Mutex) } - cli := &localClient{ mtx: mtx, Application: app, } - cli.BaseService = *service.NewBaseService(nil, "localClient", cli) return cli } func (app *localClient) SetResponseCallback(cb Callback) { app.mtx.Lock() - defer app.mtx.Unlock() app.Callback = cb + app.mtx.Unlock() } // TODO: change types.Application to include Error()? @@ -67,8 +65,8 @@ func (app *localClient) EchoAsync(ctx context.Context, msg string) (*ReqRes, err } func (app *localClient) InfoAsync(ctx context.Context, req types.RequestInfo) (*ReqRes, error) { - app.mtx.RLock() - defer app.mtx.RUnlock() + app.mtx.Lock() + defer app.mtx.Unlock() res := app.Application.Info(req) return app.callback( @@ -100,8 +98,8 @@ func (app *localClient) CheckTxAsync(ctx context.Context, req types.RequestCheck } func (app *localClient) QueryAsync(ctx context.Context, req types.RequestQuery) (*ReqRes, error) { - app.mtx.RLock() - defer app.mtx.RUnlock() + app.mtx.Lock() + defer app.mtx.Unlock() res := app.Application.Query(req) return app.callback( @@ -215,8 +213,8 @@ func (app *localClient) EchoSync(ctx context.Context, msg string) (*types.Respon } func (app *localClient) InfoSync(ctx context.Context, req types.RequestInfo) (*types.ResponseInfo, error) { - app.mtx.RLock() - defer app.mtx.RUnlock() + app.mtx.Lock() + defer app.mtx.Unlock() res := app.Application.Info(req) return &res, nil @@ -249,8 +247,8 @@ func (app *localClient) QuerySync( ctx context.Context, req types.RequestQuery, ) (*types.ResponseQuery, error) { - app.mtx.RLock() - defer app.mtx.RUnlock() + app.mtx.Lock() + defer app.mtx.Unlock() res := app.Application.Query(req) return &res, nil diff --git a/abci/client/socket_client.go b/abci/client/socket_client.go index 8a4598fe5..726c554d4 100644 --- a/abci/client/socket_client.go +++ b/abci/client/socket_client.go @@ -39,7 +39,7 @@ type socketClient struct { reqQueue chan *reqResWithContext - mtx tmsync.RWMutex + mtx tmsync.Mutex err error reqSent *list.List // list of requests sent, waiting for response resCb func(*types.Request, *types.Response) // called on all requests, if set. @@ -102,8 +102,8 @@ func (cli *socketClient) OnStop() { // Error returns an error if the client was stopped abruptly. func (cli *socketClient) Error() error { - cli.mtx.RLock() - defer cli.mtx.RUnlock() + cli.mtx.Lock() + defer cli.mtx.Unlock() return cli.err } @@ -113,8 +113,8 @@ func (cli *socketClient) Error() error { // NOTE: callback may get internally generated flush responses. func (cli *socketClient) SetResponseCallback(resCb Callback) { cli.mtx.Lock() - defer cli.mtx.Unlock() cli.resCb = resCb + cli.mtx.Unlock() } //---------------------------------------- diff --git a/internal/consensus/byzantine_test.go b/internal/consensus/byzantine_test.go index 81a4f8be8..a826ef79b 100644 --- a/internal/consensus/byzantine_test.go +++ b/internal/consensus/byzantine_test.go @@ -62,7 +62,7 @@ func TestByzantinePrevoteEquivocation(t *testing.T) { blockStore := store.NewBlockStore(blockDB) // one for mempool, one for consensus - mtx := new(tmsync.RWMutex) + mtx := new(tmsync.Mutex) proxyAppConnMem := abciclient.NewLocalClient(mtx, app) proxyAppConnCon := abciclient.NewLocalClient(mtx, app) diff --git a/internal/consensus/common_test.go b/internal/consensus/common_test.go index 5251b0fdb..8b54f6026 100644 --- a/internal/consensus/common_test.go +++ b/internal/consensus/common_test.go @@ -407,7 +407,7 @@ func newStateWithConfigAndBlockStore( blockStore *store.BlockStore, ) *State { // one for mempool, one for consensus - mtx := new(tmsync.RWMutex) + mtx := new(tmsync.Mutex) proxyAppConnMem := abciclient.NewLocalClient(mtx, app) proxyAppConnCon := abciclient.NewLocalClient(mtx, app) diff --git a/internal/consensus/reactor_test.go b/internal/consensus/reactor_test.go index baffbac30..e536906a9 100644 --- a/internal/consensus/reactor_test.go +++ b/internal/consensus/reactor_test.go @@ -346,7 +346,7 @@ func TestReactorWithEvidence(t *testing.T) { blockStore := store.NewBlockStore(blockDB) // one for mempool, one for consensus - mtx := new(tmsync.RWMutex) + mtx := new(tmsync.Mutex) proxyAppConnMem := abciclient.NewLocalClient(mtx, app) proxyAppConnCon := abciclient.NewLocalClient(mtx, app)