You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

286 lines
8.1 KiB

8 years ago
8 years ago
8 years ago
  1. package mempool
  2. import (
  3. "crypto/md5"
  4. "crypto/rand"
  5. "encoding/binary"
  6. "fmt"
  7. "io/ioutil"
  8. "os"
  9. "path/filepath"
  10. "testing"
  11. "time"
  12. "github.com/tendermint/abci/example/counter"
  13. "github.com/tendermint/abci/example/dummy"
  14. abci "github.com/tendermint/abci/types"
  15. cmn "github.com/tendermint/tmlibs/common"
  16. "github.com/tendermint/tmlibs/log"
  17. cfg "github.com/tendermint/tendermint/config"
  18. "github.com/tendermint/tendermint/proxy"
  19. "github.com/tendermint/tendermint/types"
  20. "github.com/stretchr/testify/assert"
  21. "github.com/stretchr/testify/require"
  22. )
  23. func newMempoolWithApp(cc proxy.ClientCreator) *Mempool {
  24. config := cfg.ResetTestRoot("mempool_test")
  25. appConnMem, _ := cc.NewABCIClient()
  26. appConnMem.SetLogger(log.TestingLogger().With("module", "abci-client", "connection", "mempool"))
  27. err := appConnMem.Start()
  28. if err != nil {
  29. panic(err)
  30. }
  31. mempool := NewMempool(config.Mempool, appConnMem, 0)
  32. mempool.SetLogger(log.TestingLogger())
  33. return mempool
  34. }
  35. func ensureNoFire(t *testing.T, ch <-chan int64, timeoutMS int) {
  36. timer := time.NewTimer(time.Duration(timeoutMS) * time.Millisecond)
  37. select {
  38. case <-ch:
  39. t.Fatal("Expected not to fire")
  40. case <-timer.C:
  41. }
  42. }
  43. func ensureFire(t *testing.T, ch <-chan int64, timeoutMS int) {
  44. timer := time.NewTimer(time.Duration(timeoutMS) * time.Millisecond)
  45. select {
  46. case <-ch:
  47. case <-timer.C:
  48. t.Fatal("Expected to fire")
  49. }
  50. }
  51. func checkTxs(t *testing.T, mempool *Mempool, count int) types.Txs {
  52. txs := make(types.Txs, count)
  53. for i := 0; i < count; i++ {
  54. txBytes := make([]byte, 20)
  55. txs[i] = txBytes
  56. _, err := rand.Read(txBytes)
  57. if err != nil {
  58. t.Error(err)
  59. }
  60. if err := mempool.CheckTx(txBytes, nil); err != nil {
  61. t.Fatalf("Error after CheckTx: %v", err)
  62. }
  63. }
  64. return txs
  65. }
  66. func TestTxsAvailable(t *testing.T) {
  67. app := dummy.NewDummyApplication()
  68. cc := proxy.NewLocalClientCreator(app)
  69. mempool := newMempoolWithApp(cc)
  70. mempool.EnableTxsAvailable()
  71. timeoutMS := 500
  72. // with no txs, it shouldnt fire
  73. ensureNoFire(t, mempool.TxsAvailable(), timeoutMS)
  74. // send a bunch of txs, it should only fire once
  75. txs := checkTxs(t, mempool, 100)
  76. ensureFire(t, mempool.TxsAvailable(), timeoutMS)
  77. ensureNoFire(t, mempool.TxsAvailable(), timeoutMS)
  78. // call update with half the txs.
  79. // it should fire once now for the new height
  80. // since there are still txs left
  81. committedTxs, txs := txs[:50], txs[50:]
  82. if err := mempool.Update(1, committedTxs); err != nil {
  83. t.Error(err)
  84. }
  85. ensureFire(t, mempool.TxsAvailable(), timeoutMS)
  86. ensureNoFire(t, mempool.TxsAvailable(), timeoutMS)
  87. // send a bunch more txs. we already fired for this height so it shouldnt fire again
  88. moreTxs := checkTxs(t, mempool, 50)
  89. ensureNoFire(t, mempool.TxsAvailable(), timeoutMS)
  90. // now call update with all the txs. it should not fire as there are no txs left
  91. committedTxs = append(txs, moreTxs...)
  92. if err := mempool.Update(2, committedTxs); err != nil {
  93. t.Error(err)
  94. }
  95. ensureNoFire(t, mempool.TxsAvailable(), timeoutMS)
  96. // send a bunch more txs, it should only fire once
  97. checkTxs(t, mempool, 100)
  98. ensureFire(t, mempool.TxsAvailable(), timeoutMS)
  99. ensureNoFire(t, mempool.TxsAvailable(), timeoutMS)
  100. }
  101. func TestSerialReap(t *testing.T) {
  102. app := counter.NewCounterApplication(true)
  103. app.SetOption(abci.RequestSetOption{"serial", "on"})
  104. cc := proxy.NewLocalClientCreator(app)
  105. mempool := newMempoolWithApp(cc)
  106. appConnCon, _ := cc.NewABCIClient()
  107. appConnCon.SetLogger(log.TestingLogger().With("module", "abci-client", "connection", "consensus"))
  108. err := appConnCon.Start()
  109. assert.Nil(t, err)
  110. cacheMap := make(map[string]struct{})
  111. deliverTxsRange := func(start, end int) {
  112. // Deliver some txs.
  113. for i := start; i < end; i++ {
  114. // This will succeed
  115. txBytes := make([]byte, 8)
  116. binary.BigEndian.PutUint64(txBytes, uint64(i))
  117. err := mempool.CheckTx(txBytes, nil)
  118. _, cached := cacheMap[string(txBytes)]
  119. if cached {
  120. assert.NotNil(t, err, "expected error for cached tx")
  121. } else {
  122. assert.Nil(t, err, "expected no err for uncached tx")
  123. }
  124. cacheMap[string(txBytes)] = struct{}{}
  125. // Duplicates are cached and should return error
  126. err = mempool.CheckTx(txBytes, nil)
  127. assert.NotNil(t, err, "Expected error after CheckTx on duplicated tx")
  128. }
  129. }
  130. reapCheck := func(exp int) {
  131. txs := mempool.Reap(-1)
  132. assert.Equal(t, len(txs), exp, cmn.Fmt("Expected to reap %v txs but got %v", exp, len(txs)))
  133. }
  134. updateRange := func(start, end int) {
  135. txs := make([]types.Tx, 0)
  136. for i := start; i < end; i++ {
  137. txBytes := make([]byte, 8)
  138. binary.BigEndian.PutUint64(txBytes, uint64(i))
  139. txs = append(txs, txBytes)
  140. }
  141. if err := mempool.Update(0, txs); err != nil {
  142. t.Error(err)
  143. }
  144. }
  145. commitRange := func(start, end int) {
  146. // Deliver some txs.
  147. for i := start; i < end; i++ {
  148. txBytes := make([]byte, 8)
  149. binary.BigEndian.PutUint64(txBytes, uint64(i))
  150. res, err := appConnCon.DeliverTxSync(txBytes)
  151. if err != nil {
  152. t.Errorf("Client error committing tx: %v", err)
  153. }
  154. if res.IsErr() {
  155. t.Errorf("Error committing tx. Code:%v result:%X log:%v",
  156. res.Code, res.Data, res.Log)
  157. }
  158. }
  159. res, err := appConnCon.CommitSync()
  160. if err != nil {
  161. t.Errorf("Client error committing: %v", err)
  162. }
  163. if len(res.Data) != 8 {
  164. t.Errorf("Error committing. Hash:%X log:%v", res.Data, res.Log)
  165. }
  166. }
  167. //----------------------------------------
  168. // Deliver some txs.
  169. deliverTxsRange(0, 100)
  170. // Reap the txs.
  171. reapCheck(100)
  172. // Reap again. We should get the same amount
  173. reapCheck(100)
  174. // Deliver 0 to 999, we should reap 900 new txs
  175. // because 100 were already counted.
  176. deliverTxsRange(0, 1000)
  177. // Reap the txs.
  178. reapCheck(1000)
  179. // Reap again. We should get the same amount
  180. reapCheck(1000)
  181. // Commit from the conensus AppConn
  182. commitRange(0, 500)
  183. updateRange(0, 500)
  184. // We should have 500 left.
  185. reapCheck(500)
  186. // Deliver 100 invalid txs and 100 valid txs
  187. deliverTxsRange(900, 1100)
  188. // We should have 600 now.
  189. reapCheck(600)
  190. }
  191. func TestMempoolCloseWAL(t *testing.T) {
  192. // 1. Create the temporary directory for mempool and WAL testing.
  193. rootDir, err := ioutil.TempDir("", "mempool-test")
  194. require.Nil(t, err, "expecting successful tmpdir creation")
  195. defer os.RemoveAll(rootDir)
  196. // 2. Ensure that it doesn't contain any elements -- Sanity check
  197. m1, err := filepath.Glob(filepath.Join(rootDir, "*"))
  198. require.Nil(t, err, "successful globbing expected")
  199. require.Equal(t, 0, len(m1), "no matches yet")
  200. // 3. Create the mempool
  201. wcfg := *(cfg.DefaultMempoolConfig())
  202. wcfg.RootDir = rootDir
  203. app := dummy.NewDummyApplication()
  204. cc := proxy.NewLocalClientCreator(app)
  205. appConnMem, _ := cc.NewABCIClient()
  206. mempool := NewMempool(&wcfg, appConnMem, 10)
  207. // 4. Ensure that the directory contains the WAL file
  208. m2, err := filepath.Glob(filepath.Join(rootDir, "*"))
  209. require.Nil(t, err, "successful globbing expected")
  210. require.Equal(t, 1, len(m2), "expecting the wal match in")
  211. // 5. Write some contents to the WAL
  212. mempool.CheckTx(types.Tx([]byte("foo")), nil)
  213. walFilepath := mempool.wal.Path
  214. sum1 := checksumFile(walFilepath, t)
  215. // 6. Sanity check to ensure that the written TX matches the expectation.
  216. require.Equal(t, sum1, checksumIt([]byte("foo\n")), "foo with a newline should be written")
  217. // 7. Invoke CloseWAL() and ensure it discards the
  218. // WAL thus any other write won't go through.
  219. require.True(t, mempool.CloseWAL(), "CloseWAL should CloseWAL")
  220. mempool.CheckTx(types.Tx([]byte("bar")), nil)
  221. sum2 := checksumFile(walFilepath, t)
  222. require.Equal(t, sum1, sum2, "expected no change to the WAL after invoking CloseWAL() since it was discarded")
  223. // 8. Second CloseWAL should do nothing
  224. require.False(t, mempool.CloseWAL(), "CloseWAL should CloseWAL")
  225. // 9. Sanity check to ensure that the WAL file still exists
  226. m3, err := filepath.Glob(filepath.Join(rootDir, "*"))
  227. require.Nil(t, err, "successful globbing expected")
  228. require.Equal(t, 1, len(m3), "expecting the wal match in")
  229. }
  230. func checksumIt(data []byte) string {
  231. h := md5.New()
  232. h.Write(data)
  233. return fmt.Sprintf("%x", h.Sum(nil))
  234. }
  235. func checksumFile(p string, t *testing.T) string {
  236. data, err := ioutil.ReadFile(p)
  237. require.Nil(t, err, "expecting successful read of %q", p)
  238. return checksumIt(data)
  239. }