Browse Source

fixes from review; use mempool.TxsAvailable() directly

pull/591/head
Ethan Buchman 8 years ago
parent
commit
3444bee47f
7 changed files with 51 additions and 72 deletions
  1. +4
    -1
      consensus/common_test.go
  2. +1
    -1
      consensus/mempool_test.go
  3. +14
    -33
      consensus/state.go
  4. +22
    -26
      mempool/mempool.go
  5. +4
    -4
      mempool/mempool_test.go
  6. +2
    -3
      node/node.go
  7. +4
    -4
      types/services.go

+ 4
- 1
consensus/common_test.go View File

@ -240,8 +240,11 @@ func newConsensusStateWithConfig(thisConfig *cfg.Config, state *sm.State, pv *ty
proxyAppConnCon := abcicli.NewLocalClient(mtx, app)
// Make Mempool
mempool := mempl.NewMempool(thisConfig.Mempool, proxyAppConnMem)
mempool := mempl.NewMempool(thisConfig.Mempool, proxyAppConnMem, 0)
mempool.SetLogger(log.TestingLogger().With("module", "mempool"))
if thisConfig.Consensus.NoEmptyBlocks {
mempool.EnableTxsAvailable()
}
// Make ConsensusReactor
cs := NewConsensusState(thisConfig.Consensus, state, proxyAppConnCon, blockStore, mempool)


+ 1
- 1
consensus/mempool_test.go View File

@ -20,7 +20,7 @@ func TestNoProgressUntilTxsAvailable(t *testing.T) {
config.Consensus.NoEmptyBlocks = true
state, privVals := randGenesisState(1, false, 10)
cs := newConsensusStateWithConfig(config, state, privVals[0], NewCounterApplication())
cs.mempool.FireOnTxsAvailable()
cs.mempool.EnableTxsAvailable()
height, round := cs.Height, cs.Round
newBlockCh := subscribeToEvent(cs.evsw, "tester", types.EventStringNewBlock(), 1)
startTestRound(cs, height, round)


+ 14
- 33
consensus/state.go View File

@ -227,9 +227,6 @@ type ConsensusState struct {
doPrevote func(height, round int)
setProposal func(proposal *types.Proposal) error
// signifies that txs are available for proposal
txsAvailable chan RoundState
// closed when we finish shutting down
done chan struct{}
}
@ -244,7 +241,6 @@ func NewConsensusState(config *cfg.ConsensusConfig, state *sm.State, proxyAppCon
peerMsgQueue: make(chan msgInfo, msgQueueSize),
internalMsgQueue: make(chan msgInfo, msgQueueSize),
timeoutTicker: NewTimeoutTicker(),
txsAvailable: make(chan RoundState),
done: make(chan struct{}),
}
// set function defaults (may be overwritten before calling Start)
@ -626,10 +622,8 @@ func (cs *ConsensusState) receiveRoutine(maxSteps int) {
var mi msgInfo
select {
case <-cs.txsAvailable:
// use nil for this special internal message signalling txs are available.
// no need to write this to the wal
// cs.handleMsg(msgInfo{nil, ""}, rs_)
case height := <-cs.mempool.TxsAvailable():
cs.handleTxsAvailable(height)
case mi = <-cs.peerMsgQueue:
cs.wal.Save(mi)
// handles proposals, block parts, votes
@ -669,9 +663,6 @@ func (cs *ConsensusState) handleMsg(mi msgInfo) {
var err error
msg, peerKey := mi.Msg, mi.PeerKey
switch msg := msg.(type) {
case nil:
// transactions are available, so enterPropose
// cs.enterPropose(rs.Height, rs.Round)
case *ProposalMessage:
// will not cause transition.
// once proposal is set, we can receive block parts
@ -737,6 +728,13 @@ func (cs *ConsensusState) handleTimeout(ti timeoutInfo, rs RoundState) {
}
func (cs *ConsensusState) handleTxsAvailable(height int) {
cs.mtx.Lock()
defer cs.mtx.Unlock()
// we only need to do this for round 0
cs.enterPropose(height, 0)
}
//-----------------------------------------------------------------------------
// State functions
// Used internally by handleTimeout and handleMsg to make state transitions
@ -785,10 +783,11 @@ func (cs *ConsensusState) enterNewRound(height int, round int) {
types.FireEventNewRound(cs.evsw, cs.RoundStateEvent())
// Wait for txs to be available in the mempool
// before we enterPropose. If the last block changed the app hash,
// before we enterPropose in round 0. If the last block changed the app hash,
// we may need an empty "proof" block, and enterPropose immediately.
if cs.config.NoEmptyBlocks && !cs.needProofBlock(height) {
go cs.waitForTxs(height, round)
waitForTxs := cs.config.NoEmptyBlocks && round == 0 && !cs.needProofBlock(height)
if waitForTxs {
go cs.proposalHeartbeat()
} else {
cs.enterPropose(height, round)
}
@ -808,27 +807,9 @@ func (cs *ConsensusState) needProofBlock(height int) bool {
return false
}
func (cs *ConsensusState) waitForTxs(height, round int) {
// if we're the proposer, start a heartbeat routine
// to tell other peers we're just waiting for txs (for debugging)
if cs.isProposer() {
done := make(chan struct{})
defer close(done)
go cs.proposerHeartbeat(done)
}
// wait for the mempool to have some txs
<-cs.mempool.TxsAvailable()
// now we can enterPropose
cs.txsAvailable <- RoundState{Height: height, Round: round}
}
func (cs *ConsensusState) proposerHeartbeat(done chan struct{}) {
func (cs *ConsensusState) proposalHeartbeat() {
for {
select {
case <-done:
return
default:
// TODO: broadcast heartbeat


+ 22
- 26
mempool/mempool.go View File

@ -56,14 +56,16 @@ const cacheSize = 100000
type Mempool struct {
config *cfg.MempoolConfig
proxyMtx sync.Mutex
proxyAppConn proxy.AppConnMempool
txs *clist.CList // concurrent linked-list of good txs
counter int64 // simple incrementing counter
height int // the last block Update()'d to
rechecking int32 // for re-checking filtered txs on Update()
recheckCursor *clist.CElement // next expected response
recheckEnd *clist.CElement // re-checking stops here
proxyMtx sync.Mutex
proxyAppConn proxy.AppConnMempool
txs *clist.CList // concurrent linked-list of good txs
counter int64 // simple incrementing counter
height int // the last block Update()'d to
rechecking int32 // for re-checking filtered txs on Update()
recheckCursor *clist.CElement // next expected response
recheckEnd *clist.CElement // re-checking stops here
notifiedTxsAvailable bool // true if fired on txsAvailable for this height
txsAvailable chan int // fires the next height once for each height, when the mempool is not empty
// Keep a cache of already-seen txs.
// This reduces the pressure on the proxyApp.
@ -72,21 +74,17 @@ type Mempool struct {
// A log of mempool txs
wal *auto.AutoFile
// fires once for each height, when the mempool is not empty
txsAvailable chan struct{}
notifiedTxsAvailable bool
logger log.Logger
}
// NewMempool returns a new Mempool with the given configuration and connection to an application.
func NewMempool(config *cfg.MempoolConfig, proxyAppConn proxy.AppConnMempool) *Mempool {
func NewMempool(config *cfg.MempoolConfig, proxyAppConn proxy.AppConnMempool, height int) *Mempool {
mempool := &Mempool{
config: config,
proxyAppConn: proxyAppConn,
txs: clist.New(),
counter: 0,
height: 0,
height: height,
rechecking: 0,
recheckCursor: nil,
recheckEnd: nil,
@ -98,11 +96,11 @@ func NewMempool(config *cfg.MempoolConfig, proxyAppConn proxy.AppConnMempool) *M
return mempool
}
// FireOnTxsAvailable initializes the TxsAvailable channel,
// EnableTxsAvailable initializes the TxsAvailable channel,
// ensuring it will trigger once every height when transactions are available.
// NOTE: not thread safe - should only be called once, on startup
func (mem *Mempool) FireOnTxsAvailable() {
mem.txsAvailable = make(chan struct{}, 1)
func (mem *Mempool) EnableTxsAvailable() {
mem.txsAvailable = make(chan int, 1)
}
// SetLogger sets the Logger.
@ -278,20 +276,20 @@ func (mem *Mempool) resCbRecheck(req *abci.Request, res *abci.Response) {
// TxsAvailable returns a channel which fires once for every height,
// and only when transactions are available in the mempool.
// XXX: Will panic if mem.FireOnTxsAvailable() has not been called.
func (mem *Mempool) TxsAvailable() chan struct{} {
if mem.txsAvailable == nil {
panic("mem.txsAvailable is nil")
}
// NOTE: the returned channel may be nil if EnableTxsAvailable was not called.
func (mem *Mempool) TxsAvailable() chan int {
return mem.txsAvailable
}
func (mem *Mempool) notifyIfTxsAvailable() {
if mem.Size() == 0 {
panic("notified txs available but mempool is empty!")
}
if mem.txsAvailable != nil &&
!mem.notifiedTxsAvailable && mem.Size() > 0 {
!mem.notifiedTxsAvailable {
mem.notifiedTxsAvailable = true
mem.txsAvailable <- struct{}{}
mem.txsAvailable <- mem.height + 1
}
}
@ -339,8 +337,6 @@ func (mem *Mempool) Update(height int, txs types.Txs) {
}
// Set height
// NOTE: the height is not set until Update is first called
// (so it will be wrong after a restart until the next block)
mem.height = height
mem.notifiedTxsAvailable = false


+ 4
- 4
mempool/mempool_test.go View File

@ -23,12 +23,12 @@ func newMempoolWithApp(t *testing.T, cc proxy.ClientCreator) *Mempool {
if _, err := appConnMem.Start(); err != nil {
t.Fatalf("Error starting ABCI client: %v", err.Error())
}
mempool := NewMempool(config.Mempool, appConnMem)
mempool := NewMempool(config.Mempool, appConnMem, 0)
mempool.SetLogger(log.TestingLogger())
return mempool
}
func ensureNoFire(t *testing.T, ch chan struct{}, timeoutMS int) {
func ensureNoFire(t *testing.T, ch chan int, timeoutMS int) {
timer := time.NewTimer(time.Duration(timeoutMS) * time.Millisecond)
select {
case <-ch:
@ -37,7 +37,7 @@ func ensureNoFire(t *testing.T, ch chan struct{}, timeoutMS int) {
}
}
func ensureFire(t *testing.T, ch chan struct{}, timeoutMS int) {
func ensureFire(t *testing.T, ch chan int, timeoutMS int) {
timer := time.NewTimer(time.Duration(timeoutMS) * time.Millisecond)
select {
case <-ch:
@ -64,7 +64,7 @@ func TestTxsAvailable(t *testing.T) {
app := dummy.NewDummyApplication()
cc := proxy.NewLocalClientCreator(app)
mempool := newMempoolWithApp(t, cc)
mempool.FireOnTxsAvailable()
mempool.EnableTxsAvailable()
timeoutMS := 500


+ 2
- 3
node/node.go View File

@ -137,14 +137,13 @@ func NewNode(config *cfg.Config, privValidator *types.PrivValidator, clientCreat
// Make MempoolReactor
mempoolLogger := logger.With("module", "mempool")
mempool := mempl.NewMempool(config.Mempool, proxyApp.Mempool())
mempool := mempl.NewMempool(config.Mempool, proxyApp.Mempool(), state.LastBlockHeight)
mempool.SetLogger(mempoolLogger)
mempool.Update(state.LastBlockHeight, nil)
mempoolReactor := mempl.NewMempoolReactor(config.Mempool, mempool)
mempoolReactor.SetLogger(mempoolLogger)
if config.Consensus.NoEmptyBlocks {
mempool.FireOnTxsAvailable()
mempool.EnableTxsAvailable()
}
// Make ConsensusReactor


+ 4
- 4
types/services.go View File

@ -24,8 +24,8 @@ type Mempool interface {
Update(height int, txs Txs)
Flush()
TxsAvailable() chan struct{}
FireOnTxsAvailable()
TxsAvailable() chan int
EnableTxsAvailable()
}
type MockMempool struct {
@ -38,8 +38,8 @@ func (m MockMempool) CheckTx(tx Tx, cb func(*abci.Response)) error { return nil
func (m MockMempool) Reap(n int) Txs { return Txs{} }
func (m MockMempool) Update(height int, txs Txs) {}
func (m MockMempool) Flush() {}
func (m MockMempool) TxsAvailable() chan struct{} { return make(chan struct{}) }
func (m MockMempool) FireOnTxsAvailable() {}
func (m MockMempool) TxsAvailable() chan int { return make(chan int) }
func (m MockMempool) EnableTxsAvailable() {}
//------------------------------------------------------
// blockstore


Loading…
Cancel
Save