diff --git a/consensus/common_test.go b/consensus/common_test.go index bf805e095..c59a6d969 100644 --- a/consensus/common_test.go +++ b/consensus/common_test.go @@ -332,7 +332,7 @@ func consensusLogger() log.Logger { }) } -func randConsensusNet(nValidators int, testName string, tickerFunc func() TimeoutTicker, appFunc func() abci.Application) []*ConsensusState { +func randConsensusNet(nValidators int, testName string, tickerFunc func() TimeoutTicker, appFunc func() abci.Application, configOpts ...func(*cfg.Config)) []*ConsensusState { genDoc, privVals := randGenesisDoc(nValidators, false, 10) css := make([]*ConsensusState, nValidators) logger := consensusLogger() @@ -342,6 +342,9 @@ func randConsensusNet(nValidators int, testName string, tickerFunc func() Timeou state.SetLogger(logger.With("module", "state", "validator", i)) state.Save() thisConfig := ResetConfig(Fmt("%s_%d", testName, i)) + for _, opt := range configOpts { + opt(thisConfig) + } ensureDir(path.Dir(thisConfig.Consensus.WalFile()), 0700) // dir for wal css[i] = newConsensusStateWithConfig(thisConfig, state, privVals[i], appFunc()) css[i].SetLogger(logger.With("validator", i)) diff --git a/consensus/reactor.go b/consensus/reactor.go index 72d731aca..fb25c68f6 100644 --- a/consensus/reactor.go +++ b/consensus/reactor.go @@ -1178,6 +1178,8 @@ const ( msgTypeHasVote = byte(0x15) msgTypeVoteSetMaj23 = byte(0x16) msgTypeVoteSetBits = byte(0x17) + + msgTypeProposalHeartbeat = byte(0x20) ) // ConsensusMessage is a message that can be sent and received on the ConsensusReactor @@ -1194,6 +1196,7 @@ var _ = wire.RegisterInterface( wire.ConcreteType{&HasVoteMessage{}, msgTypeHasVote}, wire.ConcreteType{&VoteSetMaj23Message{}, msgTypeVoteSetMaj23}, wire.ConcreteType{&VoteSetBitsMessage{}, msgTypeVoteSetBits}, + wire.ConcreteType{&ProposalHeartbeatMessage{}, msgTypeProposalHeartbeat}, ) // DecodeMessage decodes the given bytes into a ConsensusMessage. diff --git a/consensus/reactor_test.go b/consensus/reactor_test.go index a1ab37026..b1f9a0a5b 100644 --- a/consensus/reactor_test.go +++ b/consensus/reactor_test.go @@ -7,9 +7,11 @@ import ( "time" "github.com/tendermint/abci/example/dummy" + "github.com/tendermint/tmlibs/events" + + cfg "github.com/tendermint/tendermint/config" "github.com/tendermint/tendermint/p2p" "github.com/tendermint/tendermint/types" - "github.com/tendermint/tmlibs/events" ) func init() { @@ -76,6 +78,35 @@ func TestReactor(t *testing.T) { }, css) } +// Ensure a testnet sends proposal heartbeats and makes blocks when there are txs +func TestReactorProposalHeartbeats(t *testing.T) { + N := 4 + css := randConsensusNet(N, "consensus_reactor_test", newMockTickerFunc(true), newCounter, + func(c *cfg.Config) { + c.Consensus.CreateEmptyBlocks = false + }) + reactors, eventChans := startConsensusNet(t, css, N, false) + defer stopConsensusNet(reactors) + heartbeatChans := make([]chan interface{}, N) + for i := 0; i < N; i++ { + heartbeatChans[i] = subscribeToEvent(css[i].evsw, "tester", types.EventStringProposalHeartbeat(), 1) + } + // wait till everyone sends a proposal heartbeat + timeoutWaitGroup(t, N, func(wg *sync.WaitGroup, j int) { + <-heartbeatChans[j] + wg.Done() + }, css) + + // send a tx + css[3].mempool.CheckTx([]byte{1, 2, 3}, nil) + + // wait till everyone makes the first new block + timeoutWaitGroup(t, N, func(wg *sync.WaitGroup, j int) { + <-eventChans[j] + wg.Done() + }, css) +} + //------------------------------------------------------------- // ensure we can make blocks despite cycling a validator set diff --git a/mempool/mempool.go b/mempool/mempool.go index 6113910b7..9638baf26 100644 --- a/mempool/mempool.go +++ b/mempool/mempool.go @@ -277,7 +277,7 @@ func (mem *Mempool) resCbRecheck(req *abci.Request, res *abci.Response) { // TxsAvailable returns a channel which fires once for every height, // and only when transactions are available in the mempool. // NOTE: the returned channel may be nil if EnableTxsAvailable was not called. -func (mem *Mempool) TxsAvailable() chan int { +func (mem *Mempool) TxsAvailable() <-chan int { return mem.txsAvailable } diff --git a/mempool/mempool_test.go b/mempool/mempool_test.go index 2e7a575cf..ee61a468b 100644 --- a/mempool/mempool_test.go +++ b/mempool/mempool_test.go @@ -28,7 +28,7 @@ func newMempoolWithApp(t *testing.T, cc proxy.ClientCreator) *Mempool { return mempool } -func ensureNoFire(t *testing.T, ch chan int, timeoutMS int) { +func ensureNoFire(t *testing.T, ch <-chan int, timeoutMS int) { timer := time.NewTimer(time.Duration(timeoutMS) * time.Millisecond) select { case <-ch: @@ -37,7 +37,7 @@ func ensureNoFire(t *testing.T, ch chan int, timeoutMS int) { } } -func ensureFire(t *testing.T, ch chan int, timeoutMS int) { +func ensureFire(t *testing.T, ch <-chan int, timeoutMS int) { timer := time.NewTimer(time.Duration(timeoutMS) * time.Millisecond) select { case <-ch: diff --git a/types/services.go b/types/services.go index 0008b68e7..1805a0090 100644 --- a/types/services.go +++ b/types/services.go @@ -24,7 +24,7 @@ type Mempool interface { Update(height int, txs Txs) Flush() - TxsAvailable() chan int + TxsAvailable() <-chan int EnableTxsAvailable() } @@ -38,7 +38,7 @@ func (m MockMempool) CheckTx(tx Tx, cb func(*abci.Response)) error { return nil func (m MockMempool) Reap(n int) Txs { return Txs{} } func (m MockMempool) Update(height int, txs Txs) {} func (m MockMempool) Flush() {} -func (m MockMempool) TxsAvailable() chan int { return make(chan int) } +func (m MockMempool) TxsAvailable() <-chan int { return make(chan int) } func (m MockMempool) EnableTxsAvailable() {} //------------------------------------------------------