From ceed000c5881f5f1b7739516ae7d9677a6fd714d Mon Sep 17 00:00:00 2001 From: Jae Kwon Date: Tue, 29 Sep 2015 09:04:34 -0700 Subject: [PATCH] Use ResetFor... callback instead of EventSwitch for mempool new blocks --- consensus/state.go | 2 +- mempool/mempool.go | 6 ++---- mempool/reactor.go | 31 +++++++++++++++++++++++-------- types/keys.go | 3 ++- 4 files changed, 28 insertions(+), 14 deletions(-) diff --git a/consensus/state.go b/consensus/state.go index 4850a8d01..aa488b528 100644 --- a/consensus/state.go +++ b/consensus/state.go @@ -1312,7 +1312,7 @@ func (cs *ConsensusState) saveBlock(block *types.Block, blockParts *types.PartSe cs.stagedState.Save() // Update mempool. - cs.mempoolReactor.Mempool.ResetForBlockAndState(block, cs.stagedState) + cs.mempoolReactor.ResetForBlockAndState(block, cs.stagedState) // Fire off event if cs.evsw != nil && cs.evc != nil { diff --git a/mempool/mempool.go b/mempool/mempool.go index 98813014b..84fa68bd5 100644 --- a/mempool/mempool.go +++ b/mempool/mempool.go @@ -20,8 +20,6 @@ type Mempool struct { state *sm.State cache *sm.BlockCache txs []types.Tx // TODO: we need to add a map to facilitate replace-by-fee - - resetInfo ResetInfo // so broadcast routines can respond to mempool flushing } func NewMempool(state *sm.State) *Mempool { @@ -83,7 +81,7 @@ type Range struct { // "state" is the result of state.AppendBlock("block"). // Txs that are present in "block" are discarded from mempool. // Txs that have become invalid in the new "state" are also discarded. -func (mem *Mempool) ResetForBlockAndState(block *types.Block, state *sm.State) { +func (mem *Mempool) ResetForBlockAndState(block *types.Block, state *sm.State) ResetInfo { mem.mtx.Lock() defer mem.mtx.Unlock() mem.state = state.Copy() @@ -127,7 +125,7 @@ func (mem *Mempool) ResetForBlockAndState(block *types.Block, state *sm.State) { // We're done! log.Info("New txs", "txs", validTxs, "oldTxs", mem.txs) mem.txs = validTxs - mem.resetInfo = ri + return ri } func startRange(start *int, i int) { diff --git a/mempool/reactor.go b/mempool/reactor.go index 8992c466b..e7334ca25 100644 --- a/mempool/reactor.go +++ b/mempool/reactor.go @@ -2,6 +2,7 @@ package mempool import ( "bytes" + "errors" "fmt" "reflect" "time" @@ -9,6 +10,7 @@ import ( . "github.com/tendermint/tendermint/common" "github.com/tendermint/tendermint/events" "github.com/tendermint/tendermint/p2p" + sm "github.com/tendermint/tendermint/state" "github.com/tendermint/tendermint/types" "github.com/tendermint/tendermint/wire" ) @@ -16,8 +18,9 @@ import ( var ( MempoolChannel = byte(0x30) - checkExecutedTxsMilliseconds = 1 // check for new mempool txs to send to peer - txsToSendPerCheck = 64 // send up to this many txs from the mempool per check + checkExecutedTxsMilliseconds = 1 // check for new mempool txs to send to peer + txsToSendPerCheck = 64 // send up to this many txs from the mempool per check + newBlockChCapacity = 100 // queue to process this many ResetInfos per peer ) // MempoolReactor handles mempool tx broadcasting amongst peers. @@ -50,12 +53,8 @@ func (memR *MempoolReactor) GetChannels() []*p2p.ChannelDescriptor { // Implements Reactor func (memR *MempoolReactor) AddPeer(peer *p2p.Peer) { // Each peer gets a go routine on which we broadcast transactions in the same order we applied them to our state. - newBlockChan := make(chan ResetInfo) - memR.evsw.(*events.EventSwitch).AddListenerForEvent("broadcastRoutine:"+peer.Key, types.EventStringNewBlock(), func(data types.EventData) { - // no lock needed because consensus is blocking on this - // and the mempool is reset before this event fires - newBlockChan <- memR.Mempool.resetInfo - }) + newBlockChan := make(chan ResetInfo, newBlockChCapacity) + peer.Data.Set(types.PeerMempoolChKey, newBlockChan) timer := time.NewTicker(time.Millisecond * time.Duration(checkExecutedTxsMilliseconds)) go memR.broadcastTxRoutine(timer.C, newBlockChan, peer) } @@ -90,6 +89,22 @@ func (memR *MempoolReactor) Receive(chID byte, src *p2p.Peer, msgBytes []byte) { } } +// "block" is the new block being committed. +// "state" is the result of state.AppendBlock("block"). +// Txs that are present in "block" are discarded from mempool. +// Txs that have become invalid in the new "state" are also discarded. +func (memR *MempoolReactor) ResetForBlockAndState(block *types.Block, state *sm.State) { + ri := memR.Mempool.ResetForBlockAndState(block, state) + for _, peer := range memR.Switch.Peers().List() { + peerMempoolCh := peer.Data.Get(types.PeerMempoolChKey).(chan ResetInfo) + select { + case peerMempoolCh <- ri: + default: + memR.Switch.StopPeerForError(peer, errors.New("Peer's mempool push channel full")) + } + } +} + // Just an alias for AddTx since broadcasting happens in peer routines func (memR *MempoolReactor) BroadcastTx(tx types.Tx) error { return memR.Mempool.AddTx(tx) diff --git a/types/keys.go b/types/keys.go index 3f81e46f1..90591b959 100644 --- a/types/keys.go +++ b/types/keys.go @@ -1,5 +1,6 @@ package types var ( - PeerStateKey = "ConsensusReactor.peerState" + PeerStateKey = "ConsensusReactor.peerState" + PeerMempoolChKey = "MempoolReactor.peerMempoolCh" )