From 311f18bebfe885bd8adc7e45174035cc352752b4 Mon Sep 17 00:00:00 2001 From: Ethan Buchman Date: Thu, 13 Jul 2017 13:07:04 -0400 Subject: [PATCH] mempool: comments --- mempool/mempool.go | 39 +++++++++++++++++++++++++-------------- mempool/reactor.go | 22 ++++++++++++++++------ 2 files changed, 41 insertions(+), 20 deletions(-) diff --git a/mempool/mempool.go b/mempool/mempool.go index fd922ed8e..e804f9b35 100644 --- a/mempool/mempool.go +++ b/mempool/mempool.go @@ -50,6 +50,9 @@ TODO: Better handle abci client errors. (make it automatically handle connection const cacheSize = 100000 +// Mempool is an ordered in-memory pool for transactions before they are proposed in a consensus round. +// Transaction validity is checked using the CheckTx abci message before the transaction is added to the pool. +// The Mempool uses a concurrent list structure for storing transactions that can be efficiently accessed by multiple concurrent readers. type Mempool struct { config *cfg.MempoolConfig @@ -72,6 +75,7 @@ type Mempool struct { logger log.Logger } +// NewMempool returns a new Mempool with the given configuration and connection to an application. func NewMempool(config *cfg.MempoolConfig, proxyAppConn proxy.AppConnMempool) *Mempool { mempool := &Mempool{ config: config, @@ -90,7 +94,7 @@ func NewMempool(config *cfg.MempoolConfig, proxyAppConn proxy.AppConnMempool) *M return mempool } -// SetLogger allows you to set your own Logger. +// SetLogger sets the Logger. func (mem *Mempool) SetLogger(l log.Logger) { mem.logger = l } @@ -110,21 +114,22 @@ func (mem *Mempool) initWAL() { } } -// consensus must be able to hold lock to safely update +// Lock locks the mempool. The consensus must be able to hold lock to safely update. func (mem *Mempool) Lock() { mem.proxyMtx.Lock() } +// Unlock unlocks the mempool. func (mem *Mempool) Unlock() { mem.proxyMtx.Unlock() } -// Number of transactions in the mempool clist +// Size returns the number of transactions in the mempool. func (mem *Mempool) Size() int { return mem.txs.Len() } -// Remove all transactions from mempool and cache +// Flush removes all transactions from the mempool and cache func (mem *Mempool) Flush() { mem.proxyMtx.Lock() defer mem.proxyMtx.Unlock() @@ -137,14 +142,15 @@ func (mem *Mempool) Flush() { } } -// Return the first element of mem.txs for peer goroutines to call .NextWait() on. -// Blocks until txs has elements. +// TxsFrontWait returns the first transaction in the ordered list for peer goroutines to call .NextWait() on. +// It blocks until the mempool is not empty (ie. until the internal `mem.txs` has at least one element) func (mem *Mempool) TxsFrontWait() *clist.CElement { return mem.txs.FrontWait() } -// Try a new transaction in the mempool. -// Potentially blocking if we're blocking on Update() or Reap(). +// CheckTx executes a new transaction against the application to determine its validity +// and whether it should be added to the mempool. +// It blocks if we're waiting on Update() or Reap(). // cb: A callback from the CheckTx command. // It gets called from another goroutine. // CONTRACT: Either cb will get called, or err returned. @@ -256,8 +262,8 @@ func (mem *Mempool) resCbRecheck(req *abci.Request, res *abci.Response) { } } -// Get the valid transactions remaining -// If maxTxs is -1, there is no cap on returned transactions. +// Reap returns a list of transactions currently in the mempool. +// If maxTxs is -1, there is no cap on the number of returned transactions. func (mem *Mempool) Reap(maxTxs int) types.Txs { mem.proxyMtx.Lock() defer mem.proxyMtx.Unlock() @@ -286,8 +292,7 @@ func (mem *Mempool) collectTxs(maxTxs int) types.Txs { return txs } -// Tell mempool that these txs were committed. -// Mempool will discard these txs. +// Update informs the mempool that the given txs were committed and can be discarded. // NOTE: this should be called *after* block is committed by consensus. // NOTE: unsafe; Lock/Unlock must be managed by caller func (mem *Mempool) Update(height int, txs types.Txs) { @@ -354,19 +359,21 @@ func (mem *Mempool) recheckTxs(goodTxs []types.Tx) { //-------------------------------------------------------------------------------- -// A transaction that successfully ran +// mempoolTx is a transaction that successfully ran type mempoolTx struct { counter int64 // a simple incrementing counter height int64 // height that this tx had been validated in tx types.Tx // } +// Height returns the height for this transaction func (memTx *mempoolTx) Height() int { return int(atomic.LoadInt64(&memTx.height)) } //-------------------------------------------------------------------------------- +// txCache maintains a cache of transactions. type txCache struct { mtx sync.Mutex size int @@ -374,6 +381,7 @@ type txCache struct { list *list.List // to remove oldest tx when cache gets too big } +// newTxCache returns a new txCache. func newTxCache(cacheSize int) *txCache { return &txCache{ size: cacheSize, @@ -382,6 +390,7 @@ func newTxCache(cacheSize int) *txCache { } } +// Reset resets the txCache to empty. func (cache *txCache) Reset() { cache.mtx.Lock() cache.map_ = make(map[string]struct{}, cacheSize) @@ -389,6 +398,7 @@ func (cache *txCache) Reset() { cache.mtx.Unlock() } +// Exists returns true if the given tx is cached. func (cache *txCache) Exists(tx types.Tx) bool { cache.mtx.Lock() _, exists := cache.map_[string(tx)] @@ -396,7 +406,7 @@ func (cache *txCache) Exists(tx types.Tx) bool { return exists } -// Returns false if tx is in cache. +// Push adds the given tx to the txCache. It returns false if tx is already in the cache. func (cache *txCache) Push(tx types.Tx) bool { cache.mtx.Lock() defer cache.mtx.Unlock() @@ -418,6 +428,7 @@ func (cache *txCache) Push(tx types.Tx) bool { return true } +// Remove removes the given tx from the cache. func (cache *txCache) Remove(tx types.Tx) { cache.mtx.Lock() delete(cache.map_, string(tx)) diff --git a/mempool/reactor.go b/mempool/reactor.go index 25806c00f..e4ee417fd 100644 --- a/mempool/reactor.go +++ b/mempool/reactor.go @@ -30,6 +30,7 @@ type MempoolReactor struct { evsw types.EventSwitch } +// NewMempoolReactor returns a new MempoolReactor with the given config and mempool. func NewMempoolReactor(config *cfg.MempoolConfig, mempool *Mempool) *MempoolReactor { memR := &MempoolReactor{ config: config, @@ -39,7 +40,8 @@ func NewMempoolReactor(config *cfg.MempoolConfig, mempool *Mempool) *MempoolReac return memR } -// Implements Reactor +// GetChannels implements Reactor. +// It returns the list of channels for this reactor. func (memR *MempoolReactor) GetChannels() []*p2p.ChannelDescriptor { return []*p2p.ChannelDescriptor{ &p2p.ChannelDescriptor{ @@ -49,17 +51,19 @@ func (memR *MempoolReactor) GetChannels() []*p2p.ChannelDescriptor { } } -// Implements Reactor +// AddPeer implements Reactor. +// It starts a broadcast routine ensuring all txs are forwarded to the given peer. func (memR *MempoolReactor) AddPeer(peer *p2p.Peer) { go memR.broadcastTxRoutine(peer) } -// Implements Reactor +// RemovePeer implements Reactor. func (memR *MempoolReactor) RemovePeer(peer *p2p.Peer, reason interface{}) { // broadcast routine checks if peer is gone and returns } -// Implements Reactor +// Receive implements Reactor. +// It adds any received transactions to the mempool. func (memR *MempoolReactor) Receive(chID byte, src *p2p.Peer, msgBytes []byte) { _, msg, err := DecodeMessage(msgBytes) if err != nil { @@ -84,15 +88,17 @@ func (memR *MempoolReactor) Receive(chID byte, src *p2p.Peer, msgBytes []byte) { } } -// Just an alias for CheckTx since broadcasting happens in peer routines +// BroadcastTx is an alias for Mempool.CheckTx. Broadcasting itself happens in peer routines. func (memR *MempoolReactor) BroadcastTx(tx types.Tx, cb func(*abci.Response)) error { return memR.Mempool.CheckTx(tx, cb) } +// PeerState describes the state of a peer. type PeerState interface { GetHeight() int } +// Peer describes a peer. type Peer interface { IsRunning() bool Send(byte, interface{}) bool @@ -141,7 +147,7 @@ func (memR *MempoolReactor) broadcastTxRoutine(peer Peer) { } } -// implements events.Eventable +// SetEventSwitch implements events.Eventable. func (memR *MempoolReactor) SetEventSwitch(evsw types.EventSwitch) { memR.evsw = evsw } @@ -153,6 +159,7 @@ const ( msgTypeTx = byte(0x01) ) +// MempoolMessage is a message sent or received by the MempoolReactor. type MempoolMessage interface{} var _ = wire.RegisterInterface( @@ -160,6 +167,7 @@ var _ = wire.RegisterInterface( wire.ConcreteType{&TxMessage{}, msgTypeTx}, ) +// DecodeMessage decodes a byte-array into a MempoolMessage. func DecodeMessage(bz []byte) (msgType byte, msg MempoolMessage, err error) { msgType = bz[0] n := new(int) @@ -170,10 +178,12 @@ func DecodeMessage(bz []byte) (msgType byte, msg MempoolMessage, err error) { //------------------------------------- +// TxMessage is a MempoolMessage containing a transaction. type TxMessage struct { Tx types.Tx } +// String returns a string representation of the TxMessage. func (m *TxMessage) String() string { return fmt.Sprintf("[TxMessage %v]", m.Tx) }