Browse Source

abci: change client to use multi-reader mutexes (backport #6306) (#6873)

release/v0.34.13
mergify[bot] 3 years ago
committed by GitHub
parent
commit
e0c6199aae
No known key found for this signature in database GPG Key ID: 4AEE18F83AFDEB23
8 changed files with 37 additions and 29 deletions
  1. +4
    -4
      abci/client/client.go
  2. +4
    -4
      abci/client/grpc_client.go
  3. +20
    -12
      abci/client/local_client.go
  4. +4
    -4
      abci/client/socket_client.go
  5. +1
    -1
      consensus/byzantine_test.go
  6. +1
    -1
      consensus/common_test.go
  7. +1
    -1
      consensus/reactor_test.go
  8. +2
    -2
      proxy/client.go

+ 4
- 4
abci/client/client.go View File

@ -81,7 +81,7 @@ type ReqRes struct {
*sync.WaitGroup *sync.WaitGroup
*types.Response // Not set atomically, so be sure to use WaitGroup. *types.Response // Not set atomically, so be sure to use WaitGroup.
mtx tmsync.Mutex
mtx tmsync.RWMutex
done bool // Gets set to true once *after* WaitGroup.Done(). done bool // Gets set to true once *after* WaitGroup.Done().
cb func(*types.Response) // A single callback that may be set. cb func(*types.Response) // A single callback that may be set.
} }
@ -131,16 +131,16 @@ func (r *ReqRes) InvokeCallback() {
// //
// ref: https://github.com/tendermint/tendermint/issues/5439 // ref: https://github.com/tendermint/tendermint/issues/5439
func (r *ReqRes) GetCallback() func(*types.Response) { func (r *ReqRes) GetCallback() func(*types.Response) {
r.mtx.Lock()
defer r.mtx.Unlock()
r.mtx.RLock()
defer r.mtx.RUnlock()
return r.cb return r.cb
} }
// SetDone marks the ReqRes object as done. // SetDone marks the ReqRes object as done.
func (r *ReqRes) SetDone() { func (r *ReqRes) SetDone() {
r.mtx.Lock() r.mtx.Lock()
defer r.mtx.Unlock()
r.done = true r.done = true
r.mtx.Unlock()
} }
func waitGroup1() (wg *sync.WaitGroup) { func waitGroup1() (wg *sync.WaitGroup) {


+ 4
- 4
abci/client/grpc_client.go View File

@ -27,7 +27,7 @@ type grpcClient struct {
conn *grpc.ClientConn conn *grpc.ClientConn
chReqRes chan *ReqRes // dispatches "async" responses to callbacks *in order*, needed by mempool chReqRes chan *ReqRes // dispatches "async" responses to callbacks *in order*, needed by mempool
mtx tmsync.Mutex
mtx tmsync.RWMutex
addr string addr string
err error err error
resCb func(*types.Request, *types.Response) // listens to all callbacks resCb func(*types.Request, *types.Response) // listens to all callbacks
@ -146,8 +146,8 @@ func (cli *grpcClient) StopForError(err error) {
} }
func (cli *grpcClient) Error() error { func (cli *grpcClient) Error() error {
cli.mtx.Lock()
defer cli.mtx.Unlock()
cli.mtx.RLock()
defer cli.mtx.RUnlock()
return cli.err return cli.err
} }
@ -155,8 +155,8 @@ func (cli *grpcClient) Error() error {
// NOTE: callback may get internally generated flush responses. // NOTE: callback may get internally generated flush responses.
func (cli *grpcClient) SetResponseCallback(resCb Callback) { func (cli *grpcClient) SetResponseCallback(resCb Callback) {
cli.mtx.Lock() cli.mtx.Lock()
defer cli.mtx.Unlock()
cli.resCb = resCb cli.resCb = resCb
cli.mtx.Unlock()
} }
//---------------------------------------- //----------------------------------------


+ 20
- 12
abci/client/local_client.go View File

@ -15,27 +15,35 @@ var _ Client = (*localClient)(nil)
type localClient struct { type localClient struct {
service.BaseService service.BaseService
mtx *tmsync.Mutex
mtx *tmsync.RWMutex
types.Application types.Application
Callback Callback
} }
func NewLocalClient(mtx *tmsync.Mutex, app types.Application) Client {
var _ Client = (*localClient)(nil)
// NewLocalClient creates a local client, which will be directly calling the
// methods of the given app.
//
// Both Async and Sync methods ignore the given context.Context parameter.
func NewLocalClient(mtx *tmsync.RWMutex, app types.Application) Client {
if mtx == nil { if mtx == nil {
mtx = new(tmsync.Mutex)
mtx = &tmsync.RWMutex{}
} }
cli := &localClient{ cli := &localClient{
mtx: mtx, mtx: mtx,
Application: app, Application: app,
} }
cli.BaseService = *service.NewBaseService(nil, "localClient", cli) cli.BaseService = *service.NewBaseService(nil, "localClient", cli)
return cli return cli
} }
func (app *localClient) SetResponseCallback(cb Callback) { func (app *localClient) SetResponseCallback(cb Callback) {
app.mtx.Lock() app.mtx.Lock()
defer app.mtx.Unlock()
app.Callback = cb app.Callback = cb
app.mtx.Unlock()
} }
// TODO: change types.Application to include Error()? // TODO: change types.Application to include Error()?
@ -59,8 +67,8 @@ func (app *localClient) EchoAsync(msg string) *ReqRes {
} }
func (app *localClient) InfoAsync(req types.RequestInfo) *ReqRes { func (app *localClient) InfoAsync(req types.RequestInfo) *ReqRes {
app.mtx.Lock()
defer app.mtx.Unlock()
app.mtx.RLock()
defer app.mtx.RUnlock()
res := app.Application.Info(req) res := app.Application.Info(req)
return app.callback( return app.callback(
@ -103,8 +111,8 @@ func (app *localClient) CheckTxAsync(req types.RequestCheckTx) *ReqRes {
} }
func (app *localClient) QueryAsync(req types.RequestQuery) *ReqRes { func (app *localClient) QueryAsync(req types.RequestQuery) *ReqRes {
app.mtx.Lock()
defer app.mtx.Unlock()
app.mtx.RLock()
defer app.mtx.RUnlock()
res := app.Application.Query(req) res := app.Application.Query(req)
return app.callback( return app.callback(
@ -212,8 +220,8 @@ func (app *localClient) EchoSync(msg string) (*types.ResponseEcho, error) {
} }
func (app *localClient) InfoSync(req types.RequestInfo) (*types.ResponseInfo, error) { func (app *localClient) InfoSync(req types.RequestInfo) (*types.ResponseInfo, error) {
app.mtx.Lock()
defer app.mtx.Unlock()
app.mtx.RLock()
defer app.mtx.RUnlock()
res := app.Application.Info(req) res := app.Application.Info(req)
return &res, nil return &res, nil
@ -244,8 +252,8 @@ func (app *localClient) CheckTxSync(req types.RequestCheckTx) (*types.ResponseCh
} }
func (app *localClient) QuerySync(req types.RequestQuery) (*types.ResponseQuery, error) { func (app *localClient) QuerySync(req types.RequestQuery) (*types.ResponseQuery, error) {
app.mtx.Lock()
defer app.mtx.Unlock()
app.mtx.RLock()
defer app.mtx.RUnlock()
res := app.Application.Query(req) res := app.Application.Query(req)
return &res, nil return &res, nil


+ 4
- 4
abci/client/socket_client.go View File

@ -34,7 +34,7 @@ type socketClient struct {
reqQueue chan *ReqRes reqQueue chan *ReqRes
flushTimer *timer.ThrottleTimer flushTimer *timer.ThrottleTimer
mtx tmsync.Mutex
mtx tmsync.RWMutex
err error err error
reqSent *list.List // list of requests sent, waiting for response reqSent *list.List // list of requests sent, waiting for response
resCb func(*types.Request, *types.Response) // called on all requests, if set. resCb func(*types.Request, *types.Response) // called on all requests, if set.
@ -99,8 +99,8 @@ func (cli *socketClient) OnStop() {
// Error returns an error if the client was stopped abruptly. // Error returns an error if the client was stopped abruptly.
func (cli *socketClient) Error() error { func (cli *socketClient) Error() error {
cli.mtx.Lock()
defer cli.mtx.Unlock()
cli.mtx.RLock()
defer cli.mtx.RUnlock()
return cli.err return cli.err
} }
@ -110,8 +110,8 @@ func (cli *socketClient) Error() error {
// NOTE: callback may get internally generated flush responses. // NOTE: callback may get internally generated flush responses.
func (cli *socketClient) SetResponseCallback(resCb Callback) { func (cli *socketClient) SetResponseCallback(resCb Callback) {
cli.mtx.Lock() cli.mtx.Lock()
defer cli.mtx.Unlock()
cli.resCb = resCb cli.resCb = resCb
cli.mtx.Unlock()
} }
//---------------------------------------- //----------------------------------------


+ 1
- 1
consensus/byzantine_test.go View File

@ -59,7 +59,7 @@ func TestByzantinePrevoteEquivocation(t *testing.T) {
blockStore := store.NewBlockStore(blockDB) blockStore := store.NewBlockStore(blockDB)
// one for mempool, one for consensus // one for mempool, one for consensus
mtx := new(tmsync.Mutex)
mtx := new(tmsync.RWMutex)
proxyAppConnMem := abcicli.NewLocalClient(mtx, app) proxyAppConnMem := abcicli.NewLocalClient(mtx, app)
proxyAppConnCon := abcicli.NewLocalClient(mtx, app) proxyAppConnCon := abcicli.NewLocalClient(mtx, app)


+ 1
- 1
consensus/common_test.go View File

@ -389,7 +389,7 @@ func newStateWithConfigAndBlockStore(
blockStore := store.NewBlockStore(blockDB) blockStore := store.NewBlockStore(blockDB)
// one for mempool, one for consensus // one for mempool, one for consensus
mtx := new(tmsync.Mutex)
mtx := new(tmsync.RWMutex)
proxyAppConnMem := abcicli.NewLocalClient(mtx, app) proxyAppConnMem := abcicli.NewLocalClient(mtx, app)
proxyAppConnCon := abcicli.NewLocalClient(mtx, app) proxyAppConnCon := abcicli.NewLocalClient(mtx, app)


+ 1
- 1
consensus/reactor_test.go View File

@ -153,7 +153,7 @@ func TestReactorWithEvidence(t *testing.T) {
blockStore := store.NewBlockStore(blockDB) blockStore := store.NewBlockStore(blockDB)
// one for mempool, one for consensus // one for mempool, one for consensus
mtx := new(tmsync.Mutex)
mtx := new(tmsync.RWMutex)
proxyAppConnMem := abcicli.NewLocalClient(mtx, app) proxyAppConnMem := abcicli.NewLocalClient(mtx, app)
proxyAppConnCon := abcicli.NewLocalClient(mtx, app) proxyAppConnCon := abcicli.NewLocalClient(mtx, app)


+ 2
- 2
proxy/client.go View File

@ -20,7 +20,7 @@ type ClientCreator interface {
// local proxy uses a mutex on an in-proc app // local proxy uses a mutex on an in-proc app
type localClientCreator struct { type localClientCreator struct {
mtx *tmsync.Mutex
mtx *tmsync.RWMutex
app types.Application app types.Application
} }
@ -28,7 +28,7 @@ type localClientCreator struct {
// which will be running locally. // which will be running locally.
func NewLocalClientCreator(app types.Application) ClientCreator { func NewLocalClientCreator(app types.Application) ClientCreator {
return &localClientCreator{ return &localClientCreator{
mtx: new(tmsync.Mutex),
mtx: new(tmsync.RWMutex),
app: app, app: app,
} }
} }


Loading…
Cancel
Save