From 4f73748bc805227872ef30ac569b872949aeb111 Mon Sep 17 00:00:00 2001 From: Aleksandr Bezobchuk Date: Tue, 27 Jul 2021 15:34:06 -0400 Subject: [PATCH] mempool v1: tweak broadcastTxRoutine (#6771) --- internal/mempool/v1/mempool.go | 4 ++-- internal/mempool/v1/reactor.go | 21 ++++++++------------- 2 files changed, 10 insertions(+), 15 deletions(-) diff --git a/internal/mempool/v1/mempool.go b/internal/mempool/v1/mempool.go index aab3020ef..850600697 100644 --- a/internal/mempool/v1/mempool.go +++ b/internal/mempool/v1/mempool.go @@ -188,8 +188,8 @@ func (txmp *TxMempool) WaitForNextTx() <-chan struct{} { // NextGossipTx returns the next valid transaction to gossip. A caller must wait // for WaitForNextTx to signal a transaction is available to gossip first. It is // thread-safe. -func (txmp *TxMempool) NextGossipTx() *WrappedTx { - return txmp.gossipIndex.Front().Value.(*WrappedTx) +func (txmp *TxMempool) NextGossipTx() *clist.CElement { + return txmp.gossipIndex.Front() } // EnableTxsAvailable enables the mempool to trigger events when transactions diff --git a/internal/mempool/v1/reactor.go b/internal/mempool/v1/reactor.go index 436dd9d27..9deb7aace 100644 --- a/internal/mempool/v1/reactor.go +++ b/internal/mempool/v1/reactor.go @@ -9,6 +9,7 @@ import ( "time" cfg "github.com/tendermint/tendermint/config" + "github.com/tendermint/tendermint/internal/libs/clist" tmsync "github.com/tendermint/tendermint/internal/libs/sync" "github.com/tendermint/tendermint/internal/mempool" "github.com/tendermint/tendermint/internal/p2p" @@ -306,7 +307,7 @@ func (r *Reactor) processPeerUpdates() { func (r *Reactor) broadcastTxRoutine(peerID types.NodeID, closer *tmsync.Closer) { peerMempoolID := r.ids.GetForPeer(peerID) - var memTx *WrappedTx + var nextGossipTx *clist.CElement // remove the peer ID from the map of routines and mark the waitgroup as done defer func() { @@ -333,10 +334,10 @@ func (r *Reactor) broadcastTxRoutine(peerID types.NodeID, closer *tmsync.Closer) // This happens because the CElement we were looking at got garbage // collected (removed). That is, .NextWait() returned nil. Go ahead and // start from the beginning. - if memTx == nil { + if nextGossipTx == nil { select { case <-r.mempool.WaitForNextTx(): // wait until a tx is available - if memTx = r.mempool.NextGossipTx(); memTx == nil { + if nextGossipTx = r.mempool.NextGossipTx(); nextGossipTx == nil { continue } @@ -352,6 +353,8 @@ func (r *Reactor) broadcastTxRoutine(peerID types.NodeID, closer *tmsync.Closer) } } + memTx := nextGossipTx.Value.(*WrappedTx) + if r.peerMgr != nil { height := r.peerMgr.GetHeight(peerID) if height > 0 && height < memTx.height-1 { @@ -380,16 +383,8 @@ func (r *Reactor) broadcastTxRoutine(peerID types.NodeID, closer *tmsync.Closer) } select { - case <-memTx.gossipEl.NextWaitChan(): - // If there is a next element in gossip index, we point memTx to that node's - // value, otherwise we reset memTx to nil which will be checked at the - // parent for loop. - next := memTx.gossipEl.Next() - if next != nil { - memTx = next.Value.(*WrappedTx) - } else { - memTx = nil - } + case <-nextGossipTx.NextWaitChan(): + nextGossipTx = nextGossipTx.Next() case <-closer.Done(): // The peer is marked for removal via a PeerUpdate as the doneCh was