You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

336 lines
9.4 KiB

8 years ago
8 years ago
8 years ago
  1. package mempool
  2. import (
  3. "crypto/md5"
  4. "crypto/rand"
  5. "encoding/binary"
  6. "fmt"
  7. "io/ioutil"
  8. "os"
  9. "path/filepath"
  10. "testing"
  11. "time"
  12. "github.com/tendermint/tendermint/abci/example/counter"
  13. "github.com/tendermint/tendermint/abci/example/kvstore"
  14. abci "github.com/tendermint/tendermint/abci/types"
  15. "github.com/tendermint/tendermint/libs/log"
  16. cfg "github.com/tendermint/tendermint/config"
  17. "github.com/tendermint/tendermint/proxy"
  18. "github.com/tendermint/tendermint/types"
  19. "github.com/stretchr/testify/require"
  20. )
  21. func newMempoolWithApp(cc proxy.ClientCreator) *Mempool {
  22. config := cfg.ResetTestRoot("mempool_test")
  23. appConnMem, _ := cc.NewABCIClient()
  24. appConnMem.SetLogger(log.TestingLogger().With("module", "abci-client", "connection", "mempool"))
  25. err := appConnMem.Start()
  26. if err != nil {
  27. panic(err)
  28. }
  29. mempool := NewMempool(config.Mempool, appConnMem, 0)
  30. mempool.SetLogger(log.TestingLogger())
  31. return mempool
  32. }
  33. func ensureNoFire(t *testing.T, ch <-chan struct{}, timeoutMS int) {
  34. timer := time.NewTimer(time.Duration(timeoutMS) * time.Millisecond)
  35. select {
  36. case <-ch:
  37. t.Fatal("Expected not to fire")
  38. case <-timer.C:
  39. }
  40. }
  41. func ensureFire(t *testing.T, ch <-chan struct{}, timeoutMS int) {
  42. timer := time.NewTimer(time.Duration(timeoutMS) * time.Millisecond)
  43. select {
  44. case <-ch:
  45. case <-timer.C:
  46. t.Fatal("Expected to fire")
  47. }
  48. }
  49. func checkTxs(t *testing.T, mempool *Mempool, count int) types.Txs {
  50. txs := make(types.Txs, count)
  51. for i := 0; i < count; i++ {
  52. txBytes := make([]byte, 20)
  53. txs[i] = txBytes
  54. _, err := rand.Read(txBytes)
  55. if err != nil {
  56. t.Error(err)
  57. }
  58. if err := mempool.CheckTx(txBytes, nil); err != nil {
  59. t.Fatalf("Error after CheckTx: %v", err)
  60. }
  61. }
  62. return txs
  63. }
  64. func TestTxsAvailable(t *testing.T) {
  65. app := kvstore.NewKVStoreApplication()
  66. cc := proxy.NewLocalClientCreator(app)
  67. mempool := newMempoolWithApp(cc)
  68. mempool.EnableTxsAvailable()
  69. timeoutMS := 500
  70. // with no txs, it shouldnt fire
  71. ensureNoFire(t, mempool.TxsAvailable(), timeoutMS)
  72. // send a bunch of txs, it should only fire once
  73. txs := checkTxs(t, mempool, 100)
  74. ensureFire(t, mempool.TxsAvailable(), timeoutMS)
  75. ensureNoFire(t, mempool.TxsAvailable(), timeoutMS)
  76. // call update with half the txs.
  77. // it should fire once now for the new height
  78. // since there are still txs left
  79. committedTxs, txs := txs[:50], txs[50:]
  80. if err := mempool.Update(1, committedTxs, nil); err != nil {
  81. t.Error(err)
  82. }
  83. ensureFire(t, mempool.TxsAvailable(), timeoutMS)
  84. ensureNoFire(t, mempool.TxsAvailable(), timeoutMS)
  85. // send a bunch more txs. we already fired for this height so it shouldnt fire again
  86. moreTxs := checkTxs(t, mempool, 50)
  87. ensureNoFire(t, mempool.TxsAvailable(), timeoutMS)
  88. // now call update with all the txs. it should not fire as there are no txs left
  89. committedTxs = append(txs, moreTxs...)
  90. if err := mempool.Update(2, committedTxs, nil); err != nil {
  91. t.Error(err)
  92. }
  93. ensureNoFire(t, mempool.TxsAvailable(), timeoutMS)
  94. // send a bunch more txs, it should only fire once
  95. checkTxs(t, mempool, 100)
  96. ensureFire(t, mempool.TxsAvailable(), timeoutMS)
  97. ensureNoFire(t, mempool.TxsAvailable(), timeoutMS)
  98. }
  99. func TestSerialReap(t *testing.T) {
  100. app := counter.NewCounterApplication(true)
  101. app.SetOption(abci.RequestSetOption{Key: "serial", Value: "on"})
  102. cc := proxy.NewLocalClientCreator(app)
  103. mempool := newMempoolWithApp(cc)
  104. appConnCon, _ := cc.NewABCIClient()
  105. appConnCon.SetLogger(log.TestingLogger().With("module", "abci-client", "connection", "consensus"))
  106. err := appConnCon.Start()
  107. require.Nil(t, err)
  108. cacheMap := make(map[string]struct{})
  109. deliverTxsRange := func(start, end int) {
  110. // Deliver some txs.
  111. for i := start; i < end; i++ {
  112. // This will succeed
  113. txBytes := make([]byte, 8)
  114. binary.BigEndian.PutUint64(txBytes, uint64(i))
  115. err := mempool.CheckTx(txBytes, nil)
  116. _, cached := cacheMap[string(txBytes)]
  117. if cached {
  118. require.NotNil(t, err, "expected error for cached tx")
  119. } else {
  120. require.Nil(t, err, "expected no err for uncached tx")
  121. }
  122. cacheMap[string(txBytes)] = struct{}{}
  123. // Duplicates are cached and should return error
  124. err = mempool.CheckTx(txBytes, nil)
  125. require.NotNil(t, err, "Expected error after CheckTx on duplicated tx")
  126. }
  127. }
  128. reapCheck := func(exp int) {
  129. txs := mempool.ReapMaxBytes(-1)
  130. require.Equal(t, len(txs), exp, fmt.Sprintf("Expected to reap %v txs but got %v", exp, len(txs)))
  131. }
  132. updateRange := func(start, end int) {
  133. txs := make([]types.Tx, 0)
  134. for i := start; i < end; i++ {
  135. txBytes := make([]byte, 8)
  136. binary.BigEndian.PutUint64(txBytes, uint64(i))
  137. txs = append(txs, txBytes)
  138. }
  139. if err := mempool.Update(0, txs, nil); err != nil {
  140. t.Error(err)
  141. }
  142. }
  143. commitRange := func(start, end int) {
  144. // Deliver some txs.
  145. for i := start; i < end; i++ {
  146. txBytes := make([]byte, 8)
  147. binary.BigEndian.PutUint64(txBytes, uint64(i))
  148. res, err := appConnCon.DeliverTxSync(txBytes)
  149. if err != nil {
  150. t.Errorf("Client error committing tx: %v", err)
  151. }
  152. if res.IsErr() {
  153. t.Errorf("Error committing tx. Code:%v result:%X log:%v",
  154. res.Code, res.Data, res.Log)
  155. }
  156. }
  157. res, err := appConnCon.CommitSync()
  158. if err != nil {
  159. t.Errorf("Client error committing: %v", err)
  160. }
  161. if len(res.Data) != 8 {
  162. t.Errorf("Error committing. Hash:%X", res.Data)
  163. }
  164. }
  165. //----------------------------------------
  166. // Deliver some txs.
  167. deliverTxsRange(0, 100)
  168. // Reap the txs.
  169. reapCheck(100)
  170. // Reap again. We should get the same amount
  171. reapCheck(100)
  172. // Deliver 0 to 999, we should reap 900 new txs
  173. // because 100 were already counted.
  174. deliverTxsRange(0, 1000)
  175. // Reap the txs.
  176. reapCheck(1000)
  177. // Reap again. We should get the same amount
  178. reapCheck(1000)
  179. // Commit from the conensus AppConn
  180. commitRange(0, 500)
  181. updateRange(0, 500)
  182. // We should have 500 left.
  183. reapCheck(500)
  184. // Deliver 100 invalid txs and 100 valid txs
  185. deliverTxsRange(900, 1100)
  186. // We should have 600 now.
  187. reapCheck(600)
  188. }
  189. func TestCacheRemove(t *testing.T) {
  190. cache := newMapTxCache(100)
  191. numTxs := 10
  192. txs := make([][]byte, numTxs)
  193. for i := 0; i < numTxs; i++ {
  194. // probability of collision is 2**-256
  195. txBytes := make([]byte, 32)
  196. rand.Read(txBytes)
  197. txs[i] = txBytes
  198. cache.Push(txBytes)
  199. // make sure its added to both the linked list and the map
  200. require.Equal(t, i+1, len(cache.map_))
  201. require.Equal(t, i+1, cache.list.Len())
  202. }
  203. for i := 0; i < numTxs; i++ {
  204. cache.Remove(txs[i])
  205. // make sure its removed from both the map and the linked list
  206. require.Equal(t, numTxs-(i+1), len(cache.map_))
  207. require.Equal(t, numTxs-(i+1), cache.list.Len())
  208. }
  209. }
  210. func TestMempoolCloseWAL(t *testing.T) {
  211. // 1. Create the temporary directory for mempool and WAL testing.
  212. rootDir, err := ioutil.TempDir("", "mempool-test")
  213. require.Nil(t, err, "expecting successful tmpdir creation")
  214. defer os.RemoveAll(rootDir)
  215. // 2. Ensure that it doesn't contain any elements -- Sanity check
  216. m1, err := filepath.Glob(filepath.Join(rootDir, "*"))
  217. require.Nil(t, err, "successful globbing expected")
  218. require.Equal(t, 0, len(m1), "no matches yet")
  219. // 3. Create the mempool
  220. wcfg := cfg.DefaultMempoolConfig()
  221. wcfg.RootDir = rootDir
  222. app := kvstore.NewKVStoreApplication()
  223. cc := proxy.NewLocalClientCreator(app)
  224. appConnMem, _ := cc.NewABCIClient()
  225. mempool := NewMempool(wcfg, appConnMem, 10)
  226. mempool.InitWAL()
  227. // 4. Ensure that the directory contains the WAL file
  228. m2, err := filepath.Glob(filepath.Join(rootDir, "*"))
  229. require.Nil(t, err, "successful globbing expected")
  230. require.Equal(t, 1, len(m2), "expecting the wal match in")
  231. // 5. Write some contents to the WAL
  232. mempool.CheckTx(types.Tx([]byte("foo")), nil)
  233. walFilepath := mempool.wal.Path
  234. sum1 := checksumFile(walFilepath, t)
  235. // 6. Sanity check to ensure that the written TX matches the expectation.
  236. require.Equal(t, sum1, checksumIt([]byte("foo\n")), "foo with a newline should be written")
  237. // 7. Invoke CloseWAL() and ensure it discards the
  238. // WAL thus any other write won't go through.
  239. require.True(t, mempool.CloseWAL(), "CloseWAL should CloseWAL")
  240. mempool.CheckTx(types.Tx([]byte("bar")), nil)
  241. sum2 := checksumFile(walFilepath, t)
  242. require.Equal(t, sum1, sum2, "expected no change to the WAL after invoking CloseWAL() since it was discarded")
  243. // 8. Second CloseWAL should do nothing
  244. require.False(t, mempool.CloseWAL(), "CloseWAL should CloseWAL")
  245. // 9. Sanity check to ensure that the WAL file still exists
  246. m3, err := filepath.Glob(filepath.Join(rootDir, "*"))
  247. require.Nil(t, err, "successful globbing expected")
  248. require.Equal(t, 1, len(m3), "expecting the wal match in")
  249. }
  250. func BenchmarkCacheInsertTime(b *testing.B) {
  251. cache := newMapTxCache(b.N)
  252. txs := make([][]byte, b.N)
  253. for i := 0; i < b.N; i++ {
  254. txs[i] = make([]byte, 8)
  255. binary.BigEndian.PutUint64(txs[i], uint64(i))
  256. }
  257. b.ResetTimer()
  258. for i := 0; i < b.N; i++ {
  259. cache.Push(txs[i])
  260. }
  261. }
  262. // This benchmark is probably skewed, since we actually will be removing
  263. // txs in parallel, which may cause some overhead due to mutex locking.
  264. func BenchmarkCacheRemoveTime(b *testing.B) {
  265. cache := newMapTxCache(b.N)
  266. txs := make([][]byte, b.N)
  267. for i := 0; i < b.N; i++ {
  268. txs[i] = make([]byte, 8)
  269. binary.BigEndian.PutUint64(txs[i], uint64(i))
  270. cache.Push(txs[i])
  271. }
  272. b.ResetTimer()
  273. for i := 0; i < b.N; i++ {
  274. cache.Remove(txs[i])
  275. }
  276. }
  277. func checksumIt(data []byte) string {
  278. h := md5.New()
  279. h.Write(data)
  280. return fmt.Sprintf("%x", h.Sum(nil))
  281. }
  282. func checksumFile(p string, t *testing.T) string {
  283. data, err := ioutil.ReadFile(p)
  284. require.Nil(t, err, "expecting successful read of %q", p)
  285. return checksumIt(data)
  286. }