You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

280 lines
7.7 KiB

8 years ago
8 years ago
8 years ago
  1. package mempool
  2. import (
  3. "crypto/md5"
  4. "crypto/rand"
  5. "encoding/binary"
  6. "fmt"
  7. "io/ioutil"
  8. "os"
  9. "path/filepath"
  10. "testing"
  11. "time"
  12. "github.com/tendermint/abci/example/counter"
  13. "github.com/tendermint/abci/example/dummy"
  14. "github.com/tendermint/tmlibs/log"
  15. cfg "github.com/tendermint/tendermint/config"
  16. "github.com/tendermint/tendermint/proxy"
  17. "github.com/tendermint/tendermint/types"
  18. "github.com/stretchr/testify/require"
  19. )
  20. func newMempoolWithApp(cc proxy.ClientCreator) *Mempool {
  21. config := cfg.ResetTestRoot("mempool_test")
  22. appConnMem, _ := cc.NewABCIClient()
  23. appConnMem.SetLogger(log.TestingLogger().With("module", "abci-client", "connection", "mempool"))
  24. _, err := appConnMem.Start()
  25. if err != nil {
  26. panic(err)
  27. }
  28. mempool := NewMempool(config.Mempool, appConnMem, 0)
  29. mempool.SetLogger(log.TestingLogger())
  30. return mempool
  31. }
  32. func ensureNoFire(t *testing.T, ch <-chan int, timeoutMS int) {
  33. timer := time.NewTimer(time.Duration(timeoutMS) * time.Millisecond)
  34. select {
  35. case <-ch:
  36. t.Fatal("Expected not to fire")
  37. case <-timer.C:
  38. }
  39. }
  40. func ensureFire(t *testing.T, ch <-chan int, timeoutMS int) {
  41. timer := time.NewTimer(time.Duration(timeoutMS) * time.Millisecond)
  42. select {
  43. case <-ch:
  44. case <-timer.C:
  45. t.Fatal("Expected to fire")
  46. }
  47. }
  48. func checkTxs(t *testing.T, mempool *Mempool, count int) types.Txs {
  49. txs := make(types.Txs, count)
  50. for i := 0; i < count; i++ {
  51. txBytes := make([]byte, 20)
  52. txs[i] = txBytes
  53. _, err := rand.Read(txBytes)
  54. if err != nil {
  55. t.Error(err)
  56. }
  57. if err := mempool.CheckTx(txBytes, nil); err != nil {
  58. t.Fatalf("Error after CheckTx: %v", err)
  59. }
  60. }
  61. return txs
  62. }
  63. func TestTxsAvailable(t *testing.T) {
  64. app := dummy.NewDummyApplication()
  65. cc := proxy.NewLocalClientCreator(app)
  66. mempool := newMempoolWithApp(cc)
  67. mempool.EnableTxsAvailable()
  68. timeoutMS := 500
  69. // with no txs, it shouldnt fire
  70. ensureNoFire(t, mempool.TxsAvailable(), timeoutMS)
  71. // send a bunch of txs, it should only fire once
  72. txs := checkTxs(t, mempool, 100)
  73. ensureFire(t, mempool.TxsAvailable(), timeoutMS)
  74. ensureNoFire(t, mempool.TxsAvailable(), timeoutMS)
  75. // call update with half the txs.
  76. // it should fire once now for the new height
  77. // since there are still txs left
  78. committedTxs, txs := txs[:50], txs[50:]
  79. if err := mempool.Update(1, committedTxs); err != nil {
  80. t.Error(err)
  81. }
  82. ensureFire(t, mempool.TxsAvailable(), timeoutMS)
  83. ensureNoFire(t, mempool.TxsAvailable(), timeoutMS)
  84. // send a bunch more txs. we already fired for this height so it shouldnt fire again
  85. moreTxs := checkTxs(t, mempool, 50)
  86. ensureNoFire(t, mempool.TxsAvailable(), timeoutMS)
  87. // now call update with all the txs. it should not fire as there are no txs left
  88. committedTxs = append(txs, moreTxs...)
  89. if err := mempool.Update(2, committedTxs); err != nil {
  90. t.Error(err)
  91. }
  92. ensureNoFire(t, mempool.TxsAvailable(), timeoutMS)
  93. // send a bunch more txs, it should only fire once
  94. checkTxs(t, mempool, 100)
  95. ensureFire(t, mempool.TxsAvailable(), timeoutMS)
  96. ensureNoFire(t, mempool.TxsAvailable(), timeoutMS)
  97. }
  98. func TestSerialReap(t *testing.T) {
  99. app := counter.NewCounterApplication(true)
  100. app.SetOption("serial", "on")
  101. cc := proxy.NewLocalClientCreator(app)
  102. mempool := newMempoolWithApp(cc)
  103. appConnCon, _ := cc.NewABCIClient()
  104. appConnCon.SetLogger(log.TestingLogger().With("module", "abci-client", "connection", "consensus"))
  105. if _, err := appConnCon.Start(); err != nil {
  106. t.Fatalf("Error starting ABCI client: %v", err.Error())
  107. }
  108. deliverTxsRange := func(start, end int) {
  109. // Deliver some txs.
  110. for i := start; i < end; i++ {
  111. // This will succeed
  112. txBytes := make([]byte, 8)
  113. binary.BigEndian.PutUint64(txBytes, uint64(i))
  114. err := mempool.CheckTx(txBytes, nil)
  115. if err != nil {
  116. t.Fatalf("Error after CheckTx: %v", err)
  117. }
  118. // This will fail because not serial (incrementing)
  119. // However, error should still be nil.
  120. // It just won't show up on Reap().
  121. err = mempool.CheckTx(txBytes, nil)
  122. if err != nil {
  123. t.Fatalf("Error after CheckTx: %v", err)
  124. }
  125. }
  126. }
  127. reapCheck := func(exp int) {
  128. txs := mempool.Reap(-1)
  129. if len(txs) != exp {
  130. t.Fatalf("Expected to reap %v txs but got %v", exp, len(txs))
  131. }
  132. }
  133. updateRange := func(start, end int) {
  134. txs := make([]types.Tx, 0)
  135. for i := start; i < end; i++ {
  136. txBytes := make([]byte, 8)
  137. binary.BigEndian.PutUint64(txBytes, uint64(i))
  138. txs = append(txs, txBytes)
  139. }
  140. if err := mempool.Update(0, txs); err != nil {
  141. t.Error(err)
  142. }
  143. }
  144. commitRange := func(start, end int) {
  145. // Deliver some txs.
  146. for i := start; i < end; i++ {
  147. txBytes := make([]byte, 8)
  148. binary.BigEndian.PutUint64(txBytes, uint64(i))
  149. res := appConnCon.DeliverTxSync(txBytes)
  150. if !res.IsOK() {
  151. t.Errorf("Error committing tx. Code:%v result:%X log:%v",
  152. res.Code, res.Data, res.Log)
  153. }
  154. }
  155. res := appConnCon.CommitSync()
  156. if len(res.Data) != 8 {
  157. t.Errorf("Error committing. Hash:%X log:%v", res.Data, res.Log)
  158. }
  159. }
  160. //----------------------------------------
  161. // Deliver some txs.
  162. deliverTxsRange(0, 100)
  163. // Reap the txs.
  164. reapCheck(100)
  165. // Reap again. We should get the same amount
  166. reapCheck(100)
  167. // Deliver 0 to 999, we should reap 900 new txs
  168. // because 100 were already counted.
  169. deliverTxsRange(0, 1000)
  170. // Reap the txs.
  171. reapCheck(1000)
  172. // Reap again. We should get the same amount
  173. reapCheck(1000)
  174. // Commit from the conensus AppConn
  175. commitRange(0, 500)
  176. updateRange(0, 500)
  177. // We should have 500 left.
  178. reapCheck(500)
  179. // Deliver 100 invalid txs and 100 valid txs
  180. deliverTxsRange(900, 1100)
  181. // We should have 600 now.
  182. reapCheck(600)
  183. }
  184. func TestMempoolCloseWAL(t *testing.T) {
  185. // 1. Create the temporary directory for mempool and WAL testing.
  186. rootDir, err := ioutil.TempDir("", "mempool-test")
  187. require.Nil(t, err, "expecting successful tmpdir creation")
  188. defer os.RemoveAll(rootDir)
  189. // 2. Ensure that it doesn't contain any elements -- Sanity check
  190. m1, err := filepath.Glob(filepath.Join(rootDir, "*"))
  191. require.Nil(t, err, "successful globbing expected")
  192. require.Equal(t, 0, len(m1), "no matches yet")
  193. // 3. Create the mempool
  194. wcfg := *(cfg.DefaultMempoolConfig())
  195. wcfg.RootDir = rootDir
  196. app := dummy.NewDummyApplication()
  197. cc := proxy.NewLocalClientCreator(app)
  198. appConnMem, _ := cc.NewABCIClient()
  199. mempool := NewMempool(&wcfg, appConnMem, 10)
  200. // 4. Ensure that the directory contains the WAL file
  201. m2, err := filepath.Glob(filepath.Join(rootDir, "*"))
  202. require.Nil(t, err, "successful globbing expected")
  203. require.Equal(t, 1, len(m2), "expecting the wal match in")
  204. // 5. Write some contents to the WAL
  205. mempool.CheckTx(types.Tx([]byte("foo")), nil)
  206. walFilepath := mempool.wal.Path
  207. sum1 := checksumFile(walFilepath, t)
  208. // 6. Sanity check to ensure that the written TX matches the expectation.
  209. require.Equal(t, sum1, checksumIt([]byte("foo\n")), "foo with a newline should be written")
  210. // 7. Invoke CloseWAL() and ensure it discards the
  211. // WAL thus any other write won't go through.
  212. require.True(t, mempool.CloseWAL(), "CloseWAL should CloseWAL")
  213. mempool.CheckTx(types.Tx([]byte("bar")), nil)
  214. sum2 := checksumFile(walFilepath, t)
  215. require.Equal(t, sum1, sum2, "expected no change to the WAL after invoking CloseWAL() since it was discarded")
  216. // 8. Second CloseWAL should do nothing
  217. require.False(t, mempool.CloseWAL(), "CloseWAL should CloseWAL")
  218. // 9. Sanity check to ensure that the WAL file still exists
  219. m3, err := filepath.Glob(filepath.Join(rootDir, "*"))
  220. require.Nil(t, err, "successful globbing expected")
  221. require.Equal(t, 1, len(m3), "expecting the wal match in")
  222. }
  223. func checksumIt(data []byte) string {
  224. h := md5.New()
  225. h.Write(data)
  226. return fmt.Sprintf("%x", h.Sum(nil))
  227. }
  228. func checksumFile(p string, t *testing.T) string {
  229. data, err := ioutil.ReadFile(p)
  230. require.Nil(t, err, "expecting successful read of %q", p)
  231. return checksumIt(data)
  232. }