diff --git a/consensus/state.go b/consensus/state.go index d46ec5830..3834b1515 100644 --- a/consensus/state.go +++ b/consensus/state.go @@ -460,9 +460,12 @@ func (cs *ConsensusState) updateToState(state sm.State) { // If state isn't further out than cs.state, just ignore. // This happens when SwitchToConsensus() is called in the reactor. - // We don't want to reset e.g. the Votes. + // We don't want to reset e.g. the Votes, but we still want to + // signal the new round step, because other services (eg. mempool) + // depend on having an up-to-date peer state! if !cs.state.IsEmpty() && (state.LastBlockHeight <= cs.state.LastBlockHeight) { cs.Logger.Info("Ignoring updateToState()", "newHeight", state.LastBlockHeight+1, "oldHeight", cs.state.LastBlockHeight+1) + cs.newStep() return } @@ -492,6 +495,7 @@ func (cs *ConsensusState) updateToState(state sm.State) { } else { cs.StartTime = cs.config.Commit(cs.CommitTime) } + cs.Validators = validators cs.Proposal = nil cs.ProposalBlock = nil @@ -517,7 +521,7 @@ func (cs *ConsensusState) newStep() { rs := cs.RoundStateEvent() cs.wal.Write(rs) cs.nSteps++ - // newStep is called by updateToStep in NewConsensusState before the eventBus is set! + // newStep is called by updateToState in NewConsensusState before the eventBus is set! if cs.eventBus != nil { cs.eventBus.PublishEventNewRoundStep(rs) cs.evsw.FireEvent(types.EventNewRoundStep, &cs.RoundState) diff --git a/mempool/mempool.go b/mempool/mempool.go index 5af16b3c9..bde4984b1 100644 --- a/mempool/mempool.go +++ b/mempool/mempool.go @@ -328,11 +328,11 @@ func (mem *Mempool) notifyTxsAvailable() { panic("notified txs available but mempool is empty!") } if mem.txsAvailable != nil && !mem.notifiedTxsAvailable { + // channel cap is 1, so this will send once select { case mem.txsAvailable <- mem.height + 1: default: } - mem.notifiedTxsAvailable = true } } diff --git a/mempool/reactor.go b/mempool/reactor.go index 54a3c32fe..76758704c 100644 --- a/mempool/reactor.go +++ b/mempool/reactor.go @@ -103,6 +103,7 @@ type PeerState interface { // Send new mempool txs to peer. func (memR *MempoolReactor) broadcastTxRoutine(peer p2p.Peer) { if !memR.config.Broadcast { + memR.Logger.Info("Tx broadcasting is disabled") return } @@ -129,7 +130,8 @@ func (memR *MempoolReactor) broadcastTxRoutine(peer p2p.Peer) { height := memTx.Height() if peerState_i := peer.Get(types.PeerStateKey); peerState_i != nil { peerState := peerState_i.(PeerState) - if peerState.GetHeight() < height-1 { // Allow for a lag of 1 block + peerHeight := peerState.GetHeight() + if peerHeight < height-1 { // Allow for a lag of 1 block time.Sleep(peerCatchupSleepIntervalMS * time.Millisecond) continue } diff --git a/types/keys.go b/types/keys.go index 992551191..941e82b65 100644 --- a/types/keys.go +++ b/types/keys.go @@ -2,6 +2,5 @@ package types // UNSTABLE var ( - PeerStateKey = "ConsensusReactor.peerState" - PeerMempoolChKey = "MempoolReactor.peerMempoolCh" + PeerStateKey = "ConsensusReactor.peerState" )