diff --git a/consensus/common_test.go b/consensus/common_test.go index 63f77b09c..6b96ec311 100644 --- a/consensus/common_test.go +++ b/consensus/common_test.go @@ -240,8 +240,11 @@ func newConsensusStateWithConfig(thisConfig *cfg.Config, state *sm.State, pv *ty proxyAppConnCon := abcicli.NewLocalClient(mtx, app) // Make Mempool - mempool := mempl.NewMempool(thisConfig.Mempool, proxyAppConnMem) + mempool := mempl.NewMempool(thisConfig.Mempool, proxyAppConnMem, 0) mempool.SetLogger(log.TestingLogger().With("module", "mempool")) + if thisConfig.Consensus.NoEmptyBlocks { + mempool.EnableTxsAvailable() + } // Make ConsensusReactor cs := NewConsensusState(thisConfig.Consensus, state, proxyAppConnCon, blockStore, mempool) diff --git a/consensus/mempool_test.go b/consensus/mempool_test.go index 18daf67a6..843a85aa6 100644 --- a/consensus/mempool_test.go +++ b/consensus/mempool_test.go @@ -20,7 +20,7 @@ func TestNoProgressUntilTxsAvailable(t *testing.T) { config.Consensus.NoEmptyBlocks = true state, privVals := randGenesisState(1, false, 10) cs := newConsensusStateWithConfig(config, state, privVals[0], NewCounterApplication()) - cs.mempool.FireOnTxsAvailable() + cs.mempool.EnableTxsAvailable() height, round := cs.Height, cs.Round newBlockCh := subscribeToEvent(cs.evsw, "tester", types.EventStringNewBlock(), 1) startTestRound(cs, height, round) diff --git a/consensus/state.go b/consensus/state.go index d1dec7f59..4695a6304 100644 --- a/consensus/state.go +++ b/consensus/state.go @@ -227,9 +227,6 @@ type ConsensusState struct { doPrevote func(height, round int) setProposal func(proposal *types.Proposal) error - // signifies that txs are available for proposal - txsAvailable chan RoundState - // closed when we finish shutting down done chan struct{} } @@ -244,7 +241,6 @@ func NewConsensusState(config *cfg.ConsensusConfig, state *sm.State, proxyAppCon peerMsgQueue: make(chan msgInfo, msgQueueSize), internalMsgQueue: make(chan msgInfo, msgQueueSize), timeoutTicker: NewTimeoutTicker(), - txsAvailable: make(chan RoundState), done: make(chan struct{}), } // set function defaults (may be overwritten before calling Start) @@ -626,10 +622,8 @@ func (cs *ConsensusState) receiveRoutine(maxSteps int) { var mi msgInfo select { - case <-cs.txsAvailable: - // use nil for this special internal message signalling txs are available. - // no need to write this to the wal - // cs.handleMsg(msgInfo{nil, ""}, rs_) + case height := <-cs.mempool.TxsAvailable(): + cs.handleTxsAvailable(height) case mi = <-cs.peerMsgQueue: cs.wal.Save(mi) // handles proposals, block parts, votes @@ -669,9 +663,6 @@ func (cs *ConsensusState) handleMsg(mi msgInfo) { var err error msg, peerKey := mi.Msg, mi.PeerKey switch msg := msg.(type) { - case nil: - // transactions are available, so enterPropose - // cs.enterPropose(rs.Height, rs.Round) case *ProposalMessage: // will not cause transition. // once proposal is set, we can receive block parts @@ -737,6 +728,13 @@ func (cs *ConsensusState) handleTimeout(ti timeoutInfo, rs RoundState) { } +func (cs *ConsensusState) handleTxsAvailable(height int) { + cs.mtx.Lock() + defer cs.mtx.Unlock() + // we only need to do this for round 0 + cs.enterPropose(height, 0) +} + //----------------------------------------------------------------------------- // State functions // Used internally by handleTimeout and handleMsg to make state transitions @@ -785,10 +783,11 @@ func (cs *ConsensusState) enterNewRound(height int, round int) { types.FireEventNewRound(cs.evsw, cs.RoundStateEvent()) // Wait for txs to be available in the mempool - // before we enterPropose. If the last block changed the app hash, + // before we enterPropose in round 0. If the last block changed the app hash, // we may need an empty "proof" block, and enterPropose immediately. - if cs.config.NoEmptyBlocks && !cs.needProofBlock(height) { - go cs.waitForTxs(height, round) + waitForTxs := cs.config.NoEmptyBlocks && round == 0 && !cs.needProofBlock(height) + if waitForTxs { + go cs.proposalHeartbeat() } else { cs.enterPropose(height, round) } @@ -808,27 +807,9 @@ func (cs *ConsensusState) needProofBlock(height int) bool { return false } -func (cs *ConsensusState) waitForTxs(height, round int) { - // if we're the proposer, start a heartbeat routine - // to tell other peers we're just waiting for txs (for debugging) - if cs.isProposer() { - done := make(chan struct{}) - defer close(done) - go cs.proposerHeartbeat(done) - } - - // wait for the mempool to have some txs - <-cs.mempool.TxsAvailable() - - // now we can enterPropose - cs.txsAvailable <- RoundState{Height: height, Round: round} -} - -func (cs *ConsensusState) proposerHeartbeat(done chan struct{}) { +func (cs *ConsensusState) proposalHeartbeat() { for { select { - case <-done: - return default: // TODO: broadcast heartbeat diff --git a/mempool/mempool.go b/mempool/mempool.go index fadeb5dac..9f9824f9e 100644 --- a/mempool/mempool.go +++ b/mempool/mempool.go @@ -56,14 +56,16 @@ const cacheSize = 100000 type Mempool struct { config *cfg.MempoolConfig - proxyMtx sync.Mutex - proxyAppConn proxy.AppConnMempool - txs *clist.CList // concurrent linked-list of good txs - counter int64 // simple incrementing counter - height int // the last block Update()'d to - rechecking int32 // for re-checking filtered txs on Update() - recheckCursor *clist.CElement // next expected response - recheckEnd *clist.CElement // re-checking stops here + proxyMtx sync.Mutex + proxyAppConn proxy.AppConnMempool + txs *clist.CList // concurrent linked-list of good txs + counter int64 // simple incrementing counter + height int // the last block Update()'d to + rechecking int32 // for re-checking filtered txs on Update() + recheckCursor *clist.CElement // next expected response + recheckEnd *clist.CElement // re-checking stops here + notifiedTxsAvailable bool // true if fired on txsAvailable for this height + txsAvailable chan int // fires the next height once for each height, when the mempool is not empty // Keep a cache of already-seen txs. // This reduces the pressure on the proxyApp. @@ -72,21 +74,17 @@ type Mempool struct { // A log of mempool txs wal *auto.AutoFile - // fires once for each height, when the mempool is not empty - txsAvailable chan struct{} - notifiedTxsAvailable bool - logger log.Logger } // NewMempool returns a new Mempool with the given configuration and connection to an application. -func NewMempool(config *cfg.MempoolConfig, proxyAppConn proxy.AppConnMempool) *Mempool { +func NewMempool(config *cfg.MempoolConfig, proxyAppConn proxy.AppConnMempool, height int) *Mempool { mempool := &Mempool{ config: config, proxyAppConn: proxyAppConn, txs: clist.New(), counter: 0, - height: 0, + height: height, rechecking: 0, recheckCursor: nil, recheckEnd: nil, @@ -98,11 +96,11 @@ func NewMempool(config *cfg.MempoolConfig, proxyAppConn proxy.AppConnMempool) *M return mempool } -// FireOnTxsAvailable initializes the TxsAvailable channel, +// EnableTxsAvailable initializes the TxsAvailable channel, // ensuring it will trigger once every height when transactions are available. // NOTE: not thread safe - should only be called once, on startup -func (mem *Mempool) FireOnTxsAvailable() { - mem.txsAvailable = make(chan struct{}, 1) +func (mem *Mempool) EnableTxsAvailable() { + mem.txsAvailable = make(chan int, 1) } // SetLogger sets the Logger. @@ -278,20 +276,20 @@ func (mem *Mempool) resCbRecheck(req *abci.Request, res *abci.Response) { // TxsAvailable returns a channel which fires once for every height, // and only when transactions are available in the mempool. -// XXX: Will panic if mem.FireOnTxsAvailable() has not been called. -func (mem *Mempool) TxsAvailable() chan struct{} { - if mem.txsAvailable == nil { - panic("mem.txsAvailable is nil") - } +// NOTE: the returned channel may be nil if EnableTxsAvailable was not called. +func (mem *Mempool) TxsAvailable() chan int { return mem.txsAvailable } func (mem *Mempool) notifyIfTxsAvailable() { + if mem.Size() == 0 { + panic("notified txs available but mempool is empty!") + } if mem.txsAvailable != nil && - !mem.notifiedTxsAvailable && mem.Size() > 0 { + !mem.notifiedTxsAvailable { mem.notifiedTxsAvailable = true - mem.txsAvailable <- struct{}{} + mem.txsAvailable <- mem.height + 1 } } @@ -339,8 +337,6 @@ func (mem *Mempool) Update(height int, txs types.Txs) { } // Set height - // NOTE: the height is not set until Update is first called - // (so it will be wrong after a restart until the next block) mem.height = height mem.notifiedTxsAvailable = false diff --git a/mempool/mempool_test.go b/mempool/mempool_test.go index 6da9eff51..2e7a575cf 100644 --- a/mempool/mempool_test.go +++ b/mempool/mempool_test.go @@ -23,12 +23,12 @@ func newMempoolWithApp(t *testing.T, cc proxy.ClientCreator) *Mempool { if _, err := appConnMem.Start(); err != nil { t.Fatalf("Error starting ABCI client: %v", err.Error()) } - mempool := NewMempool(config.Mempool, appConnMem) + mempool := NewMempool(config.Mempool, appConnMem, 0) mempool.SetLogger(log.TestingLogger()) return mempool } -func ensureNoFire(t *testing.T, ch chan struct{}, timeoutMS int) { +func ensureNoFire(t *testing.T, ch chan int, timeoutMS int) { timer := time.NewTimer(time.Duration(timeoutMS) * time.Millisecond) select { case <-ch: @@ -37,7 +37,7 @@ func ensureNoFire(t *testing.T, ch chan struct{}, timeoutMS int) { } } -func ensureFire(t *testing.T, ch chan struct{}, timeoutMS int) { +func ensureFire(t *testing.T, ch chan int, timeoutMS int) { timer := time.NewTimer(time.Duration(timeoutMS) * time.Millisecond) select { case <-ch: @@ -64,7 +64,7 @@ func TestTxsAvailable(t *testing.T) { app := dummy.NewDummyApplication() cc := proxy.NewLocalClientCreator(app) mempool := newMempoolWithApp(t, cc) - mempool.FireOnTxsAvailable() + mempool.EnableTxsAvailable() timeoutMS := 500 diff --git a/node/node.go b/node/node.go index c76ae5fa3..0e8cc8fae 100644 --- a/node/node.go +++ b/node/node.go @@ -137,14 +137,13 @@ func NewNode(config *cfg.Config, privValidator *types.PrivValidator, clientCreat // Make MempoolReactor mempoolLogger := logger.With("module", "mempool") - mempool := mempl.NewMempool(config.Mempool, proxyApp.Mempool()) + mempool := mempl.NewMempool(config.Mempool, proxyApp.Mempool(), state.LastBlockHeight) mempool.SetLogger(mempoolLogger) - mempool.Update(state.LastBlockHeight, nil) mempoolReactor := mempl.NewMempoolReactor(config.Mempool, mempool) mempoolReactor.SetLogger(mempoolLogger) if config.Consensus.NoEmptyBlocks { - mempool.FireOnTxsAvailable() + mempool.EnableTxsAvailable() } // Make ConsensusReactor diff --git a/types/services.go b/types/services.go index f68702ec6..0008b68e7 100644 --- a/types/services.go +++ b/types/services.go @@ -24,8 +24,8 @@ type Mempool interface { Update(height int, txs Txs) Flush() - TxsAvailable() chan struct{} - FireOnTxsAvailable() + TxsAvailable() chan int + EnableTxsAvailable() } type MockMempool struct { @@ -38,8 +38,8 @@ func (m MockMempool) CheckTx(tx Tx, cb func(*abci.Response)) error { return nil func (m MockMempool) Reap(n int) Txs { return Txs{} } func (m MockMempool) Update(height int, txs Txs) {} func (m MockMempool) Flush() {} -func (m MockMempool) TxsAvailable() chan struct{} { return make(chan struct{}) } -func (m MockMempool) FireOnTxsAvailable() {} +func (m MockMempool) TxsAvailable() chan int { return make(chan int) } +func (m MockMempool) EnableTxsAvailable() {} //------------------------------------------------------ // blockstore