You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

232 lines
7.0 KiB

  1. package consensus
  2. import (
  3. "encoding/binary"
  4. "fmt"
  5. "testing"
  6. "time"
  7. "github.com/stretchr/testify/assert"
  8. "github.com/tendermint/abci/example/code"
  9. abci "github.com/tendermint/abci/types"
  10. cmn "github.com/tendermint/tmlibs/common"
  11. "github.com/tendermint/tendermint/types"
  12. )
  13. func init() {
  14. config = ResetConfig("consensus_mempool_test")
  15. }
  16. func TestMempoolNoProgressUntilTxsAvailable(t *testing.T) {
  17. config := ResetConfig("consensus_mempool_txs_available_test")
  18. config.Consensus.CreateEmptyBlocks = false
  19. state, privVals := randGenesisState(1, false, 10)
  20. cs := newConsensusStateWithConfig(config, state, privVals[0], NewCounterApplication())
  21. cs.mempool.EnableTxsAvailable()
  22. height, round := cs.Height, cs.Round
  23. newBlockCh := subscribe(cs.eventBus, types.EventQueryNewBlock)
  24. startTestRound(cs, height, round)
  25. ensureNewStep(newBlockCh) // first block gets committed
  26. ensureNoNewStep(newBlockCh)
  27. deliverTxsRange(cs, 0, 1)
  28. ensureNewStep(newBlockCh) // commit txs
  29. ensureNewStep(newBlockCh) // commit updated app hash
  30. ensureNoNewStep(newBlockCh)
  31. }
  32. func TestMempoolProgressAfterCreateEmptyBlocksInterval(t *testing.T) {
  33. config := ResetConfig("consensus_mempool_txs_available_test")
  34. config.Consensus.CreateEmptyBlocksInterval = int(ensureTimeout.Seconds())
  35. state, privVals := randGenesisState(1, false, 10)
  36. cs := newConsensusStateWithConfig(config, state, privVals[0], NewCounterApplication())
  37. cs.mempool.EnableTxsAvailable()
  38. height, round := cs.Height, cs.Round
  39. newBlockCh := subscribe(cs.eventBus, types.EventQueryNewBlock)
  40. startTestRound(cs, height, round)
  41. ensureNewStep(newBlockCh) // first block gets committed
  42. ensureNoNewStep(newBlockCh) // then we dont make a block ...
  43. ensureNewStep(newBlockCh) // until the CreateEmptyBlocksInterval has passed
  44. }
  45. func TestMempoolProgressInHigherRound(t *testing.T) {
  46. config := ResetConfig("consensus_mempool_txs_available_test")
  47. config.Consensus.CreateEmptyBlocks = false
  48. state, privVals := randGenesisState(1, false, 10)
  49. cs := newConsensusStateWithConfig(config, state, privVals[0], NewCounterApplication())
  50. cs.mempool.EnableTxsAvailable()
  51. height, round := cs.Height, cs.Round
  52. newBlockCh := subscribe(cs.eventBus, types.EventQueryNewBlock)
  53. newRoundCh := subscribe(cs.eventBus, types.EventQueryNewRound)
  54. timeoutCh := subscribe(cs.eventBus, types.EventQueryTimeoutPropose)
  55. cs.setProposal = func(proposal *types.Proposal) error {
  56. if cs.Height == 2 && cs.Round == 0 {
  57. // dont set the proposal in round 0 so we timeout and
  58. // go to next round
  59. cs.Logger.Info("Ignoring set proposal at height 2, round 0")
  60. return nil
  61. }
  62. return cs.defaultSetProposal(proposal)
  63. }
  64. startTestRound(cs, height, round)
  65. ensureNewStep(newRoundCh) // first round at first height
  66. ensureNewStep(newBlockCh) // first block gets committed
  67. ensureNewStep(newRoundCh) // first round at next height
  68. deliverTxsRange(cs, 0, 1) // we deliver txs, but dont set a proposal so we get the next round
  69. <-timeoutCh
  70. ensureNewStep(newRoundCh) // wait for the next round
  71. ensureNewStep(newBlockCh) // now we can commit the block
  72. }
  73. func deliverTxsRange(cs *ConsensusState, start, end int) {
  74. // Deliver some txs.
  75. for i := start; i < end; i++ {
  76. txBytes := make([]byte, 8)
  77. binary.BigEndian.PutUint64(txBytes, uint64(i))
  78. err := cs.mempool.CheckTx(txBytes, nil)
  79. if err != nil {
  80. panic(cmn.Fmt("Error after CheckTx: %v", err))
  81. }
  82. }
  83. }
  84. func TestMempoolTxConcurrentWithCommit(t *testing.T) {
  85. state, privVals := randGenesisState(1, false, 10)
  86. cs := newConsensusState(state, privVals[0], NewCounterApplication())
  87. height, round := cs.Height, cs.Round
  88. newBlockCh := subscribe(cs.eventBus, types.EventQueryNewBlock)
  89. NTxs := 10000
  90. go deliverTxsRange(cs, 0, NTxs)
  91. startTestRound(cs, height, round)
  92. for nTxs := 0; nTxs < NTxs; {
  93. ticker := time.NewTicker(time.Second * 30)
  94. select {
  95. case b := <-newBlockCh:
  96. evt := b.(types.EventDataNewBlock)
  97. nTxs += int(evt.Block.Header.NumTxs)
  98. case <-ticker.C:
  99. panic("Timed out waiting to commit blocks with transactions")
  100. }
  101. }
  102. }
  103. func TestMempoolRmBadTx(t *testing.T) {
  104. state, privVals := randGenesisState(1, false, 10)
  105. app := NewCounterApplication()
  106. cs := newConsensusState(state, privVals[0], app)
  107. // increment the counter by 1
  108. txBytes := make([]byte, 8)
  109. binary.BigEndian.PutUint64(txBytes, uint64(0))
  110. resDeliver := app.DeliverTx(txBytes)
  111. assert.False(t, resDeliver.IsErr(), cmn.Fmt("expected no error. got %v", resDeliver))
  112. resCommit := app.Commit()
  113. assert.True(t, len(resCommit.Data) > 0)
  114. emptyMempoolCh := make(chan struct{})
  115. checkTxRespCh := make(chan struct{})
  116. go func() {
  117. // Try to send the tx through the mempool.
  118. // CheckTx should not err, but the app should return a bad abci code
  119. // and the tx should get removed from the pool
  120. err := cs.mempool.CheckTx(txBytes, func(r *abci.Response) {
  121. if r.GetCheckTx().Code != code.CodeTypeBadNonce {
  122. t.Fatalf("expected checktx to return bad nonce, got %v", r)
  123. }
  124. checkTxRespCh <- struct{}{}
  125. })
  126. if err != nil {
  127. t.Fatalf("Error after CheckTx: %v", err)
  128. }
  129. // check for the tx
  130. for {
  131. txs := cs.mempool.Reap(1)
  132. if len(txs) == 0 {
  133. emptyMempoolCh <- struct{}{}
  134. return
  135. }
  136. time.Sleep(10 * time.Millisecond)
  137. }
  138. }()
  139. // Wait until the tx returns
  140. ticker := time.After(time.Second * 5)
  141. select {
  142. case <-checkTxRespCh:
  143. // success
  144. case <-ticker:
  145. t.Fatalf("Timed out waiting for tx to return")
  146. }
  147. // Wait until the tx is removed
  148. ticker = time.After(time.Second * 5)
  149. select {
  150. case <-emptyMempoolCh:
  151. // success
  152. case <-ticker:
  153. t.Fatalf("Timed out waiting for tx to be removed")
  154. }
  155. }
  156. // CounterApplication that maintains a mempool state and resets it upon commit
  157. type CounterApplication struct {
  158. abci.BaseApplication
  159. txCount int
  160. mempoolTxCount int
  161. }
  162. func NewCounterApplication() *CounterApplication {
  163. return &CounterApplication{}
  164. }
  165. func (app *CounterApplication) Info(req abci.RequestInfo) abci.ResponseInfo {
  166. return abci.ResponseInfo{Data: cmn.Fmt("txs:%v", app.txCount)}
  167. }
  168. func (app *CounterApplication) DeliverTx(tx []byte) abci.ResponseDeliverTx {
  169. txValue := txAsUint64(tx)
  170. if txValue != uint64(app.txCount) {
  171. return abci.ResponseDeliverTx{
  172. Code: code.CodeTypeBadNonce,
  173. Log: fmt.Sprintf("Invalid nonce. Expected %v, got %v", app.txCount, txValue)}
  174. }
  175. app.txCount++
  176. return abci.ResponseDeliverTx{Code: code.CodeTypeOK}
  177. }
  178. func (app *CounterApplication) CheckTx(tx []byte) abci.ResponseCheckTx {
  179. txValue := txAsUint64(tx)
  180. if txValue != uint64(app.mempoolTxCount) {
  181. return abci.ResponseCheckTx{
  182. Code: code.CodeTypeBadNonce,
  183. Log: fmt.Sprintf("Invalid nonce. Expected %v, got %v", app.mempoolTxCount, txValue)}
  184. }
  185. app.mempoolTxCount++
  186. return abci.ResponseCheckTx{Code: code.CodeTypeOK}
  187. }
  188. func txAsUint64(tx []byte) uint64 {
  189. tx8 := make([]byte, 8)
  190. copy(tx8[len(tx8)-len(tx):], tx)
  191. return binary.BigEndian.Uint64(tx8)
  192. }
  193. func (app *CounterApplication) Commit() abci.ResponseCommit {
  194. app.mempoolTxCount = app.txCount
  195. if app.txCount == 0 {
  196. return abci.ResponseCommit{}
  197. }
  198. hash := make([]byte, 8)
  199. binary.BigEndian.PutUint64(hash, uint64(app.txCount))
  200. return abci.ResponseCommit{Data: hash}
  201. }