You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

384 lines
11 KiB

8 years ago
8 years ago
8 years ago
  1. package mempool
  2. import (
  3. "crypto/md5"
  4. "crypto/rand"
  5. "encoding/binary"
  6. "fmt"
  7. "io/ioutil"
  8. "os"
  9. "path/filepath"
  10. "testing"
  11. "time"
  12. "github.com/stretchr/testify/assert"
  13. "github.com/stretchr/testify/require"
  14. "github.com/tendermint/tendermint/abci/example/counter"
  15. "github.com/tendermint/tendermint/abci/example/kvstore"
  16. abci "github.com/tendermint/tendermint/abci/types"
  17. cfg "github.com/tendermint/tendermint/config"
  18. "github.com/tendermint/tendermint/libs/log"
  19. "github.com/tendermint/tendermint/proxy"
  20. "github.com/tendermint/tendermint/types"
  21. )
  22. func newMempoolWithApp(cc proxy.ClientCreator) *Mempool {
  23. config := cfg.ResetTestRoot("mempool_test")
  24. appConnMem, _ := cc.NewABCIClient()
  25. appConnMem.SetLogger(log.TestingLogger().With("module", "abci-client", "connection", "mempool"))
  26. err := appConnMem.Start()
  27. if err != nil {
  28. panic(err)
  29. }
  30. mempool := NewMempool(config.Mempool, appConnMem, 0)
  31. mempool.SetLogger(log.TestingLogger())
  32. return mempool
  33. }
  34. func ensureNoFire(t *testing.T, ch <-chan struct{}, timeoutMS int) {
  35. timer := time.NewTimer(time.Duration(timeoutMS) * time.Millisecond)
  36. select {
  37. case <-ch:
  38. t.Fatal("Expected not to fire")
  39. case <-timer.C:
  40. }
  41. }
  42. func ensureFire(t *testing.T, ch <-chan struct{}, timeoutMS int) {
  43. timer := time.NewTimer(time.Duration(timeoutMS) * time.Millisecond)
  44. select {
  45. case <-ch:
  46. case <-timer.C:
  47. t.Fatal("Expected to fire")
  48. }
  49. }
  50. func checkTxs(t *testing.T, mempool *Mempool, count int) types.Txs {
  51. txs := make(types.Txs, count)
  52. for i := 0; i < count; i++ {
  53. txBytes := make([]byte, 20)
  54. txs[i] = txBytes
  55. _, err := rand.Read(txBytes)
  56. if err != nil {
  57. t.Error(err)
  58. }
  59. if err := mempool.CheckTx(txBytes, nil); err != nil {
  60. t.Fatalf("Error after CheckTx: %v", err)
  61. }
  62. }
  63. return txs
  64. }
  65. func TestReapMaxBytesMaxGas(t *testing.T) {
  66. app := kvstore.NewKVStoreApplication()
  67. cc := proxy.NewLocalClientCreator(app)
  68. mempool := newMempoolWithApp(cc)
  69. // Ensure gas calculation behaves as expected
  70. checkTxs(t, mempool, 1)
  71. tx0 := mempool.TxsFront().Value.(*mempoolTx)
  72. // assert that kv store has gas wanted = 1.
  73. require.Equal(t, app.CheckTx(tx0.tx).GasWanted, int64(1), "KVStore had a gas value neq to 1")
  74. require.Equal(t, tx0.gasWanted, int64(1), "transactions gas was set incorrectly")
  75. // ensure each tx is 20 bytes long
  76. require.Equal(t, len(tx0.tx), 20, "Tx is longer than 20 bytes")
  77. mempool.Flush()
  78. // each table driven test creates numTxsToCreate txs with checkTx, and at the end clears all remaining txs.
  79. // each tx has 20 bytes + amino overhead = 21 bytes, 1 gas
  80. tests := []struct {
  81. numTxsToCreate int
  82. maxBytes int
  83. maxGas int64
  84. expectedNumTxs int
  85. }{
  86. {20, -1, -1, 20},
  87. {20, -1, 0, 0},
  88. {20, -1, 10, 10},
  89. {20, -1, 30, 20},
  90. {20, 0, -1, 0},
  91. {20, 0, 10, 0},
  92. {20, 10, 10, 0},
  93. {20, 21, 10, 1},
  94. {20, 210, -1, 10},
  95. {20, 210, 5, 5},
  96. {20, 210, 10, 10},
  97. {20, 210, 15, 10},
  98. {20, 20000, -1, 20},
  99. {20, 20000, 5, 5},
  100. {20, 20000, 30, 20},
  101. }
  102. for tcIndex, tt := range tests {
  103. checkTxs(t, mempool, tt.numTxsToCreate)
  104. got := mempool.ReapMaxBytesMaxGas(tt.maxBytes, tt.maxGas)
  105. assert.Equal(t, tt.expectedNumTxs, len(got), "Got %d txs, expected %d, tc #%d",
  106. len(got), tt.expectedNumTxs, tcIndex)
  107. mempool.Flush()
  108. }
  109. }
  110. func TestTxsAvailable(t *testing.T) {
  111. app := kvstore.NewKVStoreApplication()
  112. cc := proxy.NewLocalClientCreator(app)
  113. mempool := newMempoolWithApp(cc)
  114. mempool.EnableTxsAvailable()
  115. timeoutMS := 500
  116. // with no txs, it shouldnt fire
  117. ensureNoFire(t, mempool.TxsAvailable(), timeoutMS)
  118. // send a bunch of txs, it should only fire once
  119. txs := checkTxs(t, mempool, 100)
  120. ensureFire(t, mempool.TxsAvailable(), timeoutMS)
  121. ensureNoFire(t, mempool.TxsAvailable(), timeoutMS)
  122. // call update with half the txs.
  123. // it should fire once now for the new height
  124. // since there are still txs left
  125. committedTxs, txs := txs[:50], txs[50:]
  126. if err := mempool.Update(1, committedTxs, nil); err != nil {
  127. t.Error(err)
  128. }
  129. ensureFire(t, mempool.TxsAvailable(), timeoutMS)
  130. ensureNoFire(t, mempool.TxsAvailable(), timeoutMS)
  131. // send a bunch more txs. we already fired for this height so it shouldnt fire again
  132. moreTxs := checkTxs(t, mempool, 50)
  133. ensureNoFire(t, mempool.TxsAvailable(), timeoutMS)
  134. // now call update with all the txs. it should not fire as there are no txs left
  135. committedTxs = append(txs, moreTxs...)
  136. if err := mempool.Update(2, committedTxs, nil); err != nil {
  137. t.Error(err)
  138. }
  139. ensureNoFire(t, mempool.TxsAvailable(), timeoutMS)
  140. // send a bunch more txs, it should only fire once
  141. checkTxs(t, mempool, 100)
  142. ensureFire(t, mempool.TxsAvailable(), timeoutMS)
  143. ensureNoFire(t, mempool.TxsAvailable(), timeoutMS)
  144. }
  145. func TestSerialReap(t *testing.T) {
  146. app := counter.NewCounterApplication(true)
  147. app.SetOption(abci.RequestSetOption{Key: "serial", Value: "on"})
  148. cc := proxy.NewLocalClientCreator(app)
  149. mempool := newMempoolWithApp(cc)
  150. appConnCon, _ := cc.NewABCIClient()
  151. appConnCon.SetLogger(log.TestingLogger().With("module", "abci-client", "connection", "consensus"))
  152. err := appConnCon.Start()
  153. require.Nil(t, err)
  154. cacheMap := make(map[string]struct{})
  155. deliverTxsRange := func(start, end int) {
  156. // Deliver some txs.
  157. for i := start; i < end; i++ {
  158. // This will succeed
  159. txBytes := make([]byte, 8)
  160. binary.BigEndian.PutUint64(txBytes, uint64(i))
  161. err := mempool.CheckTx(txBytes, nil)
  162. _, cached := cacheMap[string(txBytes)]
  163. if cached {
  164. require.NotNil(t, err, "expected error for cached tx")
  165. } else {
  166. require.Nil(t, err, "expected no err for uncached tx")
  167. }
  168. cacheMap[string(txBytes)] = struct{}{}
  169. // Duplicates are cached and should return error
  170. err = mempool.CheckTx(txBytes, nil)
  171. require.NotNil(t, err, "Expected error after CheckTx on duplicated tx")
  172. }
  173. }
  174. reapCheck := func(exp int) {
  175. txs := mempool.ReapMaxBytesMaxGas(-1, -1)
  176. require.Equal(t, len(txs), exp, fmt.Sprintf("Expected to reap %v txs but got %v", exp, len(txs)))
  177. }
  178. updateRange := func(start, end int) {
  179. txs := make([]types.Tx, 0)
  180. for i := start; i < end; i++ {
  181. txBytes := make([]byte, 8)
  182. binary.BigEndian.PutUint64(txBytes, uint64(i))
  183. txs = append(txs, txBytes)
  184. }
  185. if err := mempool.Update(0, txs, nil); err != nil {
  186. t.Error(err)
  187. }
  188. }
  189. commitRange := func(start, end int) {
  190. // Deliver some txs.
  191. for i := start; i < end; i++ {
  192. txBytes := make([]byte, 8)
  193. binary.BigEndian.PutUint64(txBytes, uint64(i))
  194. res, err := appConnCon.DeliverTxSync(txBytes)
  195. if err != nil {
  196. t.Errorf("Client error committing tx: %v", err)
  197. }
  198. if res.IsErr() {
  199. t.Errorf("Error committing tx. Code:%v result:%X log:%v",
  200. res.Code, res.Data, res.Log)
  201. }
  202. }
  203. res, err := appConnCon.CommitSync()
  204. if err != nil {
  205. t.Errorf("Client error committing: %v", err)
  206. }
  207. if len(res.Data) != 8 {
  208. t.Errorf("Error committing. Hash:%X", res.Data)
  209. }
  210. }
  211. //----------------------------------------
  212. // Deliver some txs.
  213. deliverTxsRange(0, 100)
  214. // Reap the txs.
  215. reapCheck(100)
  216. // Reap again. We should get the same amount
  217. reapCheck(100)
  218. // Deliver 0 to 999, we should reap 900 new txs
  219. // because 100 were already counted.
  220. deliverTxsRange(0, 1000)
  221. // Reap the txs.
  222. reapCheck(1000)
  223. // Reap again. We should get the same amount
  224. reapCheck(1000)
  225. // Commit from the conensus AppConn
  226. commitRange(0, 500)
  227. updateRange(0, 500)
  228. // We should have 500 left.
  229. reapCheck(500)
  230. // Deliver 100 invalid txs and 100 valid txs
  231. deliverTxsRange(900, 1100)
  232. // We should have 600 now.
  233. reapCheck(600)
  234. }
  235. func TestCacheRemove(t *testing.T) {
  236. cache := newMapTxCache(100)
  237. numTxs := 10
  238. txs := make([][]byte, numTxs)
  239. for i := 0; i < numTxs; i++ {
  240. // probability of collision is 2**-256
  241. txBytes := make([]byte, 32)
  242. rand.Read(txBytes)
  243. txs[i] = txBytes
  244. cache.Push(txBytes)
  245. // make sure its added to both the linked list and the map
  246. require.Equal(t, i+1, len(cache.map_))
  247. require.Equal(t, i+1, cache.list.Len())
  248. }
  249. for i := 0; i < numTxs; i++ {
  250. cache.Remove(txs[i])
  251. // make sure its removed from both the map and the linked list
  252. require.Equal(t, numTxs-(i+1), len(cache.map_))
  253. require.Equal(t, numTxs-(i+1), cache.list.Len())
  254. }
  255. }
  256. func TestMempoolCloseWAL(t *testing.T) {
  257. // 1. Create the temporary directory for mempool and WAL testing.
  258. rootDir, err := ioutil.TempDir("", "mempool-test")
  259. require.Nil(t, err, "expecting successful tmpdir creation")
  260. defer os.RemoveAll(rootDir)
  261. // 2. Ensure that it doesn't contain any elements -- Sanity check
  262. m1, err := filepath.Glob(filepath.Join(rootDir, "*"))
  263. require.Nil(t, err, "successful globbing expected")
  264. require.Equal(t, 0, len(m1), "no matches yet")
  265. // 3. Create the mempool
  266. wcfg := cfg.DefaultMempoolConfig()
  267. wcfg.RootDir = rootDir
  268. app := kvstore.NewKVStoreApplication()
  269. cc := proxy.NewLocalClientCreator(app)
  270. appConnMem, _ := cc.NewABCIClient()
  271. mempool := NewMempool(wcfg, appConnMem, 10)
  272. mempool.InitWAL()
  273. // 4. Ensure that the directory contains the WAL file
  274. m2, err := filepath.Glob(filepath.Join(rootDir, "*"))
  275. require.Nil(t, err, "successful globbing expected")
  276. require.Equal(t, 1, len(m2), "expecting the wal match in")
  277. // 5. Write some contents to the WAL
  278. mempool.CheckTx(types.Tx([]byte("foo")), nil)
  279. walFilepath := mempool.wal.Path
  280. sum1 := checksumFile(walFilepath, t)
  281. // 6. Sanity check to ensure that the written TX matches the expectation.
  282. require.Equal(t, sum1, checksumIt([]byte("foo\n")), "foo with a newline should be written")
  283. // 7. Invoke CloseWAL() and ensure it discards the
  284. // WAL thus any other write won't go through.
  285. require.True(t, mempool.CloseWAL(), "CloseWAL should CloseWAL")
  286. mempool.CheckTx(types.Tx([]byte("bar")), nil)
  287. sum2 := checksumFile(walFilepath, t)
  288. require.Equal(t, sum1, sum2, "expected no change to the WAL after invoking CloseWAL() since it was discarded")
  289. // 8. Second CloseWAL should do nothing
  290. require.False(t, mempool.CloseWAL(), "CloseWAL should CloseWAL")
  291. // 9. Sanity check to ensure that the WAL file still exists
  292. m3, err := filepath.Glob(filepath.Join(rootDir, "*"))
  293. require.Nil(t, err, "successful globbing expected")
  294. require.Equal(t, 1, len(m3), "expecting the wal match in")
  295. }
  296. func BenchmarkCacheInsertTime(b *testing.B) {
  297. cache := newMapTxCache(b.N)
  298. txs := make([][]byte, b.N)
  299. for i := 0; i < b.N; i++ {
  300. txs[i] = make([]byte, 8)
  301. binary.BigEndian.PutUint64(txs[i], uint64(i))
  302. }
  303. b.ResetTimer()
  304. for i := 0; i < b.N; i++ {
  305. cache.Push(txs[i])
  306. }
  307. }
  308. // This benchmark is probably skewed, since we actually will be removing
  309. // txs in parallel, which may cause some overhead due to mutex locking.
  310. func BenchmarkCacheRemoveTime(b *testing.B) {
  311. cache := newMapTxCache(b.N)
  312. txs := make([][]byte, b.N)
  313. for i := 0; i < b.N; i++ {
  314. txs[i] = make([]byte, 8)
  315. binary.BigEndian.PutUint64(txs[i], uint64(i))
  316. cache.Push(txs[i])
  317. }
  318. b.ResetTimer()
  319. for i := 0; i < b.N; i++ {
  320. cache.Remove(txs[i])
  321. }
  322. }
  323. func checksumIt(data []byte) string {
  324. h := md5.New()
  325. h.Write(data)
  326. return fmt.Sprintf("%x", h.Sum(nil))
  327. }
  328. func checksumFile(p string, t *testing.T) string {
  329. data, err := ioutil.ReadFile(p)
  330. require.Nil(t, err, "expecting successful read of %q", p)
  331. return checksumIt(data)
  332. }