Browse Source

mempool: comments

pull/588/head
Ethan Buchman 7 years ago
parent
commit
311f18bebf
2 changed files with 41 additions and 20 deletions
  1. +25
    -14
      mempool/mempool.go
  2. +16
    -6
      mempool/reactor.go

+ 25
- 14
mempool/mempool.go View File

@ -50,6 +50,9 @@ TODO: Better handle abci client errors. (make it automatically handle connection
const cacheSize = 100000
// Mempool is an ordered in-memory pool for transactions before they are proposed in a consensus round.
// Transaction validity is checked using the CheckTx abci message before the transaction is added to the pool.
// The Mempool uses a concurrent list structure for storing transactions that can be efficiently accessed by multiple concurrent readers.
type Mempool struct {
config *cfg.MempoolConfig
@ -72,6 +75,7 @@ type Mempool struct {
logger log.Logger
}
// NewMempool returns a new Mempool with the given configuration and connection to an application.
func NewMempool(config *cfg.MempoolConfig, proxyAppConn proxy.AppConnMempool) *Mempool {
mempool := &Mempool{
config: config,
@ -90,7 +94,7 @@ func NewMempool(config *cfg.MempoolConfig, proxyAppConn proxy.AppConnMempool) *M
return mempool
}
// SetLogger allows you to set your own Logger.
// SetLogger sets the Logger.
func (mem *Mempool) SetLogger(l log.Logger) {
mem.logger = l
}
@ -110,21 +114,22 @@ func (mem *Mempool) initWAL() {
}
}
// consensus must be able to hold lock to safely update
// Lock locks the mempool. The consensus must be able to hold lock to safely update.
func (mem *Mempool) Lock() {
mem.proxyMtx.Lock()
}
// Unlock unlocks the mempool.
func (mem *Mempool) Unlock() {
mem.proxyMtx.Unlock()
}
// Number of transactions in the mempool clist
// Size returns the number of transactions in the mempool.
func (mem *Mempool) Size() int {
return mem.txs.Len()
}
// Remove all transactions from mempool and cache
// Flush removes all transactions from the mempool and cache
func (mem *Mempool) Flush() {
mem.proxyMtx.Lock()
defer mem.proxyMtx.Unlock()
@ -137,14 +142,15 @@ func (mem *Mempool) Flush() {
}
}
// Return the first element of mem.txs for peer goroutines to call .NextWait() on.
// Blocks until txs has elements.
// TxsFrontWait returns the first transaction in the ordered list for peer goroutines to call .NextWait() on.
// It blocks until the mempool is not empty (ie. until the internal `mem.txs` has at least one element)
func (mem *Mempool) TxsFrontWait() *clist.CElement {
return mem.txs.FrontWait()
}
// Try a new transaction in the mempool.
// Potentially blocking if we're blocking on Update() or Reap().
// CheckTx executes a new transaction against the application to determine its validity
// and whether it should be added to the mempool.
// It blocks if we're waiting on Update() or Reap().
// cb: A callback from the CheckTx command.
// It gets called from another goroutine.
// CONTRACT: Either cb will get called, or err returned.
@ -256,8 +262,8 @@ func (mem *Mempool) resCbRecheck(req *abci.Request, res *abci.Response) {
}
}
// Get the valid transactions remaining
// If maxTxs is -1, there is no cap on returned transactions.
// Reap returns a list of transactions currently in the mempool.
// If maxTxs is -1, there is no cap on the number of returned transactions.
func (mem *Mempool) Reap(maxTxs int) types.Txs {
mem.proxyMtx.Lock()
defer mem.proxyMtx.Unlock()
@ -286,8 +292,7 @@ func (mem *Mempool) collectTxs(maxTxs int) types.Txs {
return txs
}
// Tell mempool that these txs were committed.
// Mempool will discard these txs.
// Update informs the mempool that the given txs were committed and can be discarded.
// NOTE: this should be called *after* block is committed by consensus.
// NOTE: unsafe; Lock/Unlock must be managed by caller
func (mem *Mempool) Update(height int, txs types.Txs) {
@ -354,19 +359,21 @@ func (mem *Mempool) recheckTxs(goodTxs []types.Tx) {
//--------------------------------------------------------------------------------
// A transaction that successfully ran
// mempoolTx is a transaction that successfully ran
type mempoolTx struct {
counter int64 // a simple incrementing counter
height int64 // height that this tx had been validated in
tx types.Tx //
}
// Height returns the height for this transaction
func (memTx *mempoolTx) Height() int {
return int(atomic.LoadInt64(&memTx.height))
}
//--------------------------------------------------------------------------------
// txCache maintains a cache of transactions.
type txCache struct {
mtx sync.Mutex
size int
@ -374,6 +381,7 @@ type txCache struct {
list *list.List // to remove oldest tx when cache gets too big
}
// newTxCache returns a new txCache.
func newTxCache(cacheSize int) *txCache {
return &txCache{
size: cacheSize,
@ -382,6 +390,7 @@ func newTxCache(cacheSize int) *txCache {
}
}
// Reset resets the txCache to empty.
func (cache *txCache) Reset() {
cache.mtx.Lock()
cache.map_ = make(map[string]struct{}, cacheSize)
@ -389,6 +398,7 @@ func (cache *txCache) Reset() {
cache.mtx.Unlock()
}
// Exists returns true if the given tx is cached.
func (cache *txCache) Exists(tx types.Tx) bool {
cache.mtx.Lock()
_, exists := cache.map_[string(tx)]
@ -396,7 +406,7 @@ func (cache *txCache) Exists(tx types.Tx) bool {
return exists
}
// Returns false if tx is in cache.
// Push adds the given tx to the txCache. It returns false if tx is already in the cache.
func (cache *txCache) Push(tx types.Tx) bool {
cache.mtx.Lock()
defer cache.mtx.Unlock()
@ -418,6 +428,7 @@ func (cache *txCache) Push(tx types.Tx) bool {
return true
}
// Remove removes the given tx from the cache.
func (cache *txCache) Remove(tx types.Tx) {
cache.mtx.Lock()
delete(cache.map_, string(tx))


+ 16
- 6
mempool/reactor.go View File

@ -30,6 +30,7 @@ type MempoolReactor struct {
evsw types.EventSwitch
}
// NewMempoolReactor returns a new MempoolReactor with the given config and mempool.
func NewMempoolReactor(config *cfg.MempoolConfig, mempool *Mempool) *MempoolReactor {
memR := &MempoolReactor{
config: config,
@ -39,7 +40,8 @@ func NewMempoolReactor(config *cfg.MempoolConfig, mempool *Mempool) *MempoolReac
return memR
}
// Implements Reactor
// GetChannels implements Reactor.
// It returns the list of channels for this reactor.
func (memR *MempoolReactor) GetChannels() []*p2p.ChannelDescriptor {
return []*p2p.ChannelDescriptor{
&p2p.ChannelDescriptor{
@ -49,17 +51,19 @@ func (memR *MempoolReactor) GetChannels() []*p2p.ChannelDescriptor {
}
}
// Implements Reactor
// AddPeer implements Reactor.
// It starts a broadcast routine ensuring all txs are forwarded to the given peer.
func (memR *MempoolReactor) AddPeer(peer *p2p.Peer) {
go memR.broadcastTxRoutine(peer)
}
// Implements Reactor
// RemovePeer implements Reactor.
func (memR *MempoolReactor) RemovePeer(peer *p2p.Peer, reason interface{}) {
// broadcast routine checks if peer is gone and returns
}
// Implements Reactor
// Receive implements Reactor.
// It adds any received transactions to the mempool.
func (memR *MempoolReactor) Receive(chID byte, src *p2p.Peer, msgBytes []byte) {
_, msg, err := DecodeMessage(msgBytes)
if err != nil {
@ -84,15 +88,17 @@ func (memR *MempoolReactor) Receive(chID byte, src *p2p.Peer, msgBytes []byte) {
}
}
// Just an alias for CheckTx since broadcasting happens in peer routines
// BroadcastTx is an alias for Mempool.CheckTx. Broadcasting itself happens in peer routines.
func (memR *MempoolReactor) BroadcastTx(tx types.Tx, cb func(*abci.Response)) error {
return memR.Mempool.CheckTx(tx, cb)
}
// PeerState describes the state of a peer.
type PeerState interface {
GetHeight() int
}
// Peer describes a peer.
type Peer interface {
IsRunning() bool
Send(byte, interface{}) bool
@ -141,7 +147,7 @@ func (memR *MempoolReactor) broadcastTxRoutine(peer Peer) {
}
}
// implements events.Eventable
// SetEventSwitch implements events.Eventable.
func (memR *MempoolReactor) SetEventSwitch(evsw types.EventSwitch) {
memR.evsw = evsw
}
@ -153,6 +159,7 @@ const (
msgTypeTx = byte(0x01)
)
// MempoolMessage is a message sent or received by the MempoolReactor.
type MempoolMessage interface{}
var _ = wire.RegisterInterface(
@ -160,6 +167,7 @@ var _ = wire.RegisterInterface(
wire.ConcreteType{&TxMessage{}, msgTypeTx},
)
// DecodeMessage decodes a byte-array into a MempoolMessage.
func DecodeMessage(bz []byte) (msgType byte, msg MempoolMessage, err error) {
msgType = bz[0]
n := new(int)
@ -170,10 +178,12 @@ func DecodeMessage(bz []byte) (msgType byte, msg MempoolMessage, err error) {
//-------------------------------------
// TxMessage is a MempoolMessage containing a transaction.
type TxMessage struct {
Tx types.Tx
}
// String returns a string representation of the TxMessage.
func (m *TxMessage) String() string {
return fmt.Sprintf("[TxMessage %v]", m.Tx)
}

Loading…
Cancel
Save