Browse Source

Use ResetFor... callback instead of EventSwitch for mempool new blocks

pull/149/head
Jae Kwon 9 years ago
committed by Ethan Buchman
parent
commit
ceed000c58
4 changed files with 28 additions and 14 deletions
  1. +1
    -1
      consensus/state.go
  2. +2
    -4
      mempool/mempool.go
  3. +23
    -8
      mempool/reactor.go
  4. +2
    -1
      types/keys.go

+ 1
- 1
consensus/state.go View File

@ -1312,7 +1312,7 @@ func (cs *ConsensusState) saveBlock(block *types.Block, blockParts *types.PartSe
cs.stagedState.Save()
// Update mempool.
cs.mempoolReactor.Mempool.ResetForBlockAndState(block, cs.stagedState)
cs.mempoolReactor.ResetForBlockAndState(block, cs.stagedState)
// Fire off event
if cs.evsw != nil && cs.evc != nil {


+ 2
- 4
mempool/mempool.go View File

@ -20,8 +20,6 @@ type Mempool struct {
state *sm.State
cache *sm.BlockCache
txs []types.Tx // TODO: we need to add a map to facilitate replace-by-fee
resetInfo ResetInfo // so broadcast routines can respond to mempool flushing
}
func NewMempool(state *sm.State) *Mempool {
@ -83,7 +81,7 @@ type Range struct {
// "state" is the result of state.AppendBlock("block").
// Txs that are present in "block" are discarded from mempool.
// Txs that have become invalid in the new "state" are also discarded.
func (mem *Mempool) ResetForBlockAndState(block *types.Block, state *sm.State) {
func (mem *Mempool) ResetForBlockAndState(block *types.Block, state *sm.State) ResetInfo {
mem.mtx.Lock()
defer mem.mtx.Unlock()
mem.state = state.Copy()
@ -127,7 +125,7 @@ func (mem *Mempool) ResetForBlockAndState(block *types.Block, state *sm.State) {
// We're done!
log.Info("New txs", "txs", validTxs, "oldTxs", mem.txs)
mem.txs = validTxs
mem.resetInfo = ri
return ri
}
func startRange(start *int, i int) {


+ 23
- 8
mempool/reactor.go View File

@ -2,6 +2,7 @@ package mempool
import (
"bytes"
"errors"
"fmt"
"reflect"
"time"
@ -9,6 +10,7 @@ import (
. "github.com/tendermint/tendermint/common"
"github.com/tendermint/tendermint/events"
"github.com/tendermint/tendermint/p2p"
sm "github.com/tendermint/tendermint/state"
"github.com/tendermint/tendermint/types"
"github.com/tendermint/tendermint/wire"
)
@ -16,8 +18,9 @@ import (
var (
MempoolChannel = byte(0x30)
checkExecutedTxsMilliseconds = 1 // check for new mempool txs to send to peer
txsToSendPerCheck = 64 // send up to this many txs from the mempool per check
checkExecutedTxsMilliseconds = 1 // check for new mempool txs to send to peer
txsToSendPerCheck = 64 // send up to this many txs from the mempool per check
newBlockChCapacity = 100 // queue to process this many ResetInfos per peer
)
// MempoolReactor handles mempool tx broadcasting amongst peers.
@ -50,12 +53,8 @@ func (memR *MempoolReactor) GetChannels() []*p2p.ChannelDescriptor {
// Implements Reactor
func (memR *MempoolReactor) AddPeer(peer *p2p.Peer) {
// Each peer gets a go routine on which we broadcast transactions in the same order we applied them to our state.
newBlockChan := make(chan ResetInfo)
memR.evsw.(*events.EventSwitch).AddListenerForEvent("broadcastRoutine:"+peer.Key, types.EventStringNewBlock(), func(data types.EventData) {
// no lock needed because consensus is blocking on this
// and the mempool is reset before this event fires
newBlockChan <- memR.Mempool.resetInfo
})
newBlockChan := make(chan ResetInfo, newBlockChCapacity)
peer.Data.Set(types.PeerMempoolChKey, newBlockChan)
timer := time.NewTicker(time.Millisecond * time.Duration(checkExecutedTxsMilliseconds))
go memR.broadcastTxRoutine(timer.C, newBlockChan, peer)
}
@ -90,6 +89,22 @@ func (memR *MempoolReactor) Receive(chID byte, src *p2p.Peer, msgBytes []byte) {
}
}
// "block" is the new block being committed.
// "state" is the result of state.AppendBlock("block").
// Txs that are present in "block" are discarded from mempool.
// Txs that have become invalid in the new "state" are also discarded.
func (memR *MempoolReactor) ResetForBlockAndState(block *types.Block, state *sm.State) {
ri := memR.Mempool.ResetForBlockAndState(block, state)
for _, peer := range memR.Switch.Peers().List() {
peerMempoolCh := peer.Data.Get(types.PeerMempoolChKey).(chan ResetInfo)
select {
case peerMempoolCh <- ri:
default:
memR.Switch.StopPeerForError(peer, errors.New("Peer's mempool push channel full"))
}
}
}
// Just an alias for AddTx since broadcasting happens in peer routines
func (memR *MempoolReactor) BroadcastTx(tx types.Tx) error {
return memR.Mempool.AddTx(tx)


+ 2
- 1
types/keys.go View File

@ -1,5 +1,6 @@
package types
var (
PeerStateKey = "ConsensusReactor.peerState"
PeerStateKey = "ConsensusReactor.peerState"
PeerMempoolChKey = "MempoolReactor.peerMempoolCh"
)

Loading…
Cancel
Save