Browse Source

comments

pull/3511/head
Ethan Buchman 5 years ago
parent
commit
bcb0b348e7
No known key found for this signature in database GPG Key ID: DBB0B3EC64A4BDAA
2 changed files with 25 additions and 13 deletions
  1. +13
    -8
      abci/client/socket_client.go
  2. +12
    -5
      mempool/mempool.go

+ 13
- 8
abci/client/socket_client.go View File

@ -26,16 +26,17 @@ var _ Client = (*socketClient)(nil)
type socketClient struct {
cmn.BaseService
reqQueue chan *ReqRes
flushTimer *cmn.ThrottleTimer
addr string
mustConnect bool
conn net.Conn
reqQueue chan *ReqRes
flushTimer *cmn.ThrottleTimer
mtx sync.Mutex
addr string
conn net.Conn
err error
reqSent *list.List
resCb func(*types.Request, *types.Response) // listens to all callbacks
reqSent *list.List // list of requests sent, waiting for response
resCb func(*types.Request, *types.Response) // called on all requests, if set.
}
@ -86,6 +87,7 @@ func (cli *socketClient) OnStop() {
cli.mtx.Lock()
defer cli.mtx.Unlock()
if cli.conn != nil {
// does this really need a mutex?
cli.conn.Close()
}
@ -207,12 +209,15 @@ func (cli *socketClient) didRecvResponse(res *types.Response) error {
reqres.Done() // Release waiters
cli.reqSent.Remove(next) // Pop first item from linked list
// Notify reqRes listener if set
// Notify reqRes listener if set (request specific callback).
// NOTE: it is possible this callback isn't set on the reqres object
// at this point, in which case it will be called after, when it is set.
// TODO: should we move this after the resCb call so the order is always consistent?
if cb := reqres.GetCallback(); cb != nil {
cb(res)
}
// Notify client listener if set
// Notify client listener if set (global callback).
if cli.resCb != nil {
cli.resCb(reqres.Request, res)
}


+ 12
- 5
mempool/mempool.go View File

@ -442,11 +442,13 @@ func (mem *Mempool) globalCb(req *abci.Request, res *abci.Response) {
mem.metrics.Size.Set(float64(mem.Size()))
}
// Request specific callback that should be set on individual reqRes objects to incorporate local information
// when processing the response. This allows us to track the peer
// that sent us this tx, so we can avoid sending it back to them.
// NOTE: we could avoid the need for request-specific callbacks if we included the information
// in the ABCI request!
// Request specific callback that should be set on individual reqRes objects
// to incorporate local information when processing the response.
// This allows us to track the peer that sent us this tx, so we can avoid sending it back to them.
// NOTE: alternatively, we could include this information in the ABCI request itself.
//
// External callers of CheckTx, like the RPC, can also pass an externalCb through here that is called
// when all other response processing is complete.
//
// Used in CheckTxWithInfo to record PeerID who sent us the tx.
func (mem *Mempool) reqResCb(tx []byte, peerID uint16, externalCb func(*abci.Response)) func(res *abci.Response) {
@ -467,6 +469,8 @@ func (mem *Mempool) reqResCb(tx []byte, peerID uint16, externalCb func(*abci.Res
}
}
// Called from:
// - resCbFirstTime (lock not held) if tx is valid
func (mem *Mempool) addTx(memTx *mempoolTx) {
e := mem.txs.PushBack(memTx)
mem.txsMap[sha256.Sum256(memTx.tx)] = e
@ -474,6 +478,9 @@ func (mem *Mempool) addTx(memTx *mempoolTx) {
mem.metrics.TxSizeBytes.Observe(float64(len(memTx.tx)))
}
// Called from:
// - Update (lock held) if tx was committed
// - resCbRecheck (lock not held) if tx was invalidated
func (mem *Mempool) removeTx(tx types.Tx, elem *clist.CElement, removeFromCache bool) {
mem.txs.Remove(elem)
elem.DetachPrev()


Loading…
Cancel
Save