You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

441 lines
13 KiB

8 years ago
8 years ago
8 years ago
  1. package mempool
  2. import (
  3. "crypto/md5"
  4. "crypto/rand"
  5. "encoding/binary"
  6. "fmt"
  7. "io/ioutil"
  8. "os"
  9. "path/filepath"
  10. "testing"
  11. "time"
  12. "github.com/stretchr/testify/assert"
  13. "github.com/stretchr/testify/require"
  14. amino "github.com/tendermint/go-amino"
  15. "github.com/tendermint/tendermint/abci/example/counter"
  16. "github.com/tendermint/tendermint/abci/example/kvstore"
  17. abci "github.com/tendermint/tendermint/abci/types"
  18. cfg "github.com/tendermint/tendermint/config"
  19. "github.com/tendermint/tendermint/libs/log"
  20. "github.com/tendermint/tendermint/proxy"
  21. "github.com/tendermint/tendermint/types"
  22. )
  23. func newMempoolWithApp(cc proxy.ClientCreator) *Mempool {
  24. config := cfg.ResetTestRoot("mempool_test")
  25. appConnMem, _ := cc.NewABCIClient()
  26. appConnMem.SetLogger(log.TestingLogger().With("module", "abci-client", "connection", "mempool"))
  27. err := appConnMem.Start()
  28. if err != nil {
  29. panic(err)
  30. }
  31. mempool := NewMempool(config.Mempool, appConnMem, 0)
  32. mempool.SetLogger(log.TestingLogger())
  33. return mempool
  34. }
  35. func ensureNoFire(t *testing.T, ch <-chan struct{}, timeoutMS int) {
  36. timer := time.NewTimer(time.Duration(timeoutMS) * time.Millisecond)
  37. select {
  38. case <-ch:
  39. t.Fatal("Expected not to fire")
  40. case <-timer.C:
  41. }
  42. }
  43. func ensureFire(t *testing.T, ch <-chan struct{}, timeoutMS int) {
  44. timer := time.NewTimer(time.Duration(timeoutMS) * time.Millisecond)
  45. select {
  46. case <-ch:
  47. case <-timer.C:
  48. t.Fatal("Expected to fire")
  49. }
  50. }
  51. func checkTxs(t *testing.T, mempool *Mempool, count int) types.Txs {
  52. txs := make(types.Txs, count)
  53. for i := 0; i < count; i++ {
  54. txBytes := make([]byte, 20)
  55. txs[i] = txBytes
  56. _, err := rand.Read(txBytes)
  57. if err != nil {
  58. t.Error(err)
  59. }
  60. if err := mempool.CheckTx(txBytes, nil); err != nil {
  61. t.Fatalf("Error after CheckTx: %v", err)
  62. }
  63. }
  64. return txs
  65. }
  66. func TestReapMaxBytesMaxGas(t *testing.T) {
  67. app := kvstore.NewKVStoreApplication()
  68. cc := proxy.NewLocalClientCreator(app)
  69. mempool := newMempoolWithApp(cc)
  70. // Ensure gas calculation behaves as expected
  71. checkTxs(t, mempool, 1)
  72. tx0 := mempool.TxsFront().Value.(*mempoolTx)
  73. // assert that kv store has gas wanted = 1.
  74. require.Equal(t, app.CheckTx(tx0.tx).GasWanted, int64(1), "KVStore had a gas value neq to 1")
  75. require.Equal(t, tx0.gasWanted, int64(1), "transactions gas was set incorrectly")
  76. // ensure each tx is 20 bytes long
  77. require.Equal(t, len(tx0.tx), 20, "Tx is longer than 20 bytes")
  78. mempool.Flush()
  79. // each table driven test creates numTxsToCreate txs with checkTx, and at the end clears all remaining txs.
  80. // each tx has 20 bytes + amino overhead = 21 bytes, 1 gas
  81. tests := []struct {
  82. numTxsToCreate int
  83. maxBytes int64
  84. maxGas int64
  85. expectedNumTxs int
  86. }{
  87. {20, -1, -1, 20},
  88. {20, -1, 0, 0},
  89. {20, -1, 10, 10},
  90. {20, -1, 30, 20},
  91. {20, 0, -1, 0},
  92. {20, 0, 10, 0},
  93. {20, 10, 10, 0},
  94. {20, 21, 10, 1},
  95. {20, 210, -1, 10},
  96. {20, 210, 5, 5},
  97. {20, 210, 10, 10},
  98. {20, 210, 15, 10},
  99. {20, 20000, -1, 20},
  100. {20, 20000, 5, 5},
  101. {20, 20000, 30, 20},
  102. }
  103. for tcIndex, tt := range tests {
  104. checkTxs(t, mempool, tt.numTxsToCreate)
  105. got := mempool.ReapMaxBytesMaxGas(tt.maxBytes, tt.maxGas)
  106. assert.Equal(t, tt.expectedNumTxs, len(got), "Got %d txs, expected %d, tc #%d",
  107. len(got), tt.expectedNumTxs, tcIndex)
  108. mempool.Flush()
  109. }
  110. }
  111. func TestMempoolFilters(t *testing.T) {
  112. app := kvstore.NewKVStoreApplication()
  113. cc := proxy.NewLocalClientCreator(app)
  114. mempool := newMempoolWithApp(cc)
  115. emptyTxArr := []types.Tx{[]byte{}}
  116. nopPreFilter := func(tx types.Tx) bool { return true }
  117. nopPostFilter := func(tx types.Tx, res *abci.ResponseCheckTx) bool { return true }
  118. // This is the same filter we expect to be used within node/node.go and state/execution.go
  119. nBytePreFilter := func(n int) func(tx types.Tx) bool {
  120. return func(tx types.Tx) bool {
  121. // We have to account for the amino overhead in the tx size as well
  122. aminoOverhead := amino.UvarintSize(uint64(len(tx)))
  123. return (len(tx) + aminoOverhead) <= n
  124. }
  125. }
  126. nGasPostFilter := func(n int64) func(tx types.Tx, res *abci.ResponseCheckTx) bool {
  127. return func(tx types.Tx, res *abci.ResponseCheckTx) bool {
  128. if n == -1 {
  129. return true
  130. }
  131. return res.GasWanted <= n
  132. }
  133. }
  134. // each table driven test creates numTxsToCreate txs with checkTx, and at the end clears all remaining txs.
  135. // each tx has 20 bytes + amino overhead = 21 bytes, 1 gas
  136. tests := []struct {
  137. numTxsToCreate int
  138. preFilter func(tx types.Tx) bool
  139. postFilter func(tx types.Tx, res *abci.ResponseCheckTx) bool
  140. expectedNumTxs int
  141. }{
  142. {10, nopPreFilter, nopPostFilter, 10},
  143. {10, nBytePreFilter(10), nopPostFilter, 0},
  144. {10, nBytePreFilter(20), nopPostFilter, 0},
  145. {10, nBytePreFilter(21), nopPostFilter, 10},
  146. {10, nopPreFilter, nGasPostFilter(-1), 10},
  147. {10, nopPreFilter, nGasPostFilter(0), 0},
  148. {10, nopPreFilter, nGasPostFilter(1), 10},
  149. {10, nopPreFilter, nGasPostFilter(3000), 10},
  150. {10, nBytePreFilter(10), nGasPostFilter(20), 0},
  151. {10, nBytePreFilter(30), nGasPostFilter(20), 10},
  152. {10, nBytePreFilter(21), nGasPostFilter(1), 10},
  153. {10, nBytePreFilter(21), nGasPostFilter(0), 0},
  154. }
  155. for tcIndex, tt := range tests {
  156. mempool.Update(1, emptyTxArr, tt.preFilter, tt.postFilter)
  157. checkTxs(t, mempool, tt.numTxsToCreate)
  158. require.Equal(t, tt.expectedNumTxs, mempool.Size(), "mempool had the incorrect size, on test case %d", tcIndex)
  159. mempool.Flush()
  160. }
  161. }
  162. func TestTxsAvailable(t *testing.T) {
  163. app := kvstore.NewKVStoreApplication()
  164. cc := proxy.NewLocalClientCreator(app)
  165. mempool := newMempoolWithApp(cc)
  166. mempool.EnableTxsAvailable()
  167. timeoutMS := 500
  168. // with no txs, it shouldnt fire
  169. ensureNoFire(t, mempool.TxsAvailable(), timeoutMS)
  170. // send a bunch of txs, it should only fire once
  171. txs := checkTxs(t, mempool, 100)
  172. ensureFire(t, mempool.TxsAvailable(), timeoutMS)
  173. ensureNoFire(t, mempool.TxsAvailable(), timeoutMS)
  174. // call update with half the txs.
  175. // it should fire once now for the new height
  176. // since there are still txs left
  177. committedTxs, txs := txs[:50], txs[50:]
  178. if err := mempool.Update(1, committedTxs, nil, nil); err != nil {
  179. t.Error(err)
  180. }
  181. ensureFire(t, mempool.TxsAvailable(), timeoutMS)
  182. ensureNoFire(t, mempool.TxsAvailable(), timeoutMS)
  183. // send a bunch more txs. we already fired for this height so it shouldnt fire again
  184. moreTxs := checkTxs(t, mempool, 50)
  185. ensureNoFire(t, mempool.TxsAvailable(), timeoutMS)
  186. // now call update with all the txs. it should not fire as there are no txs left
  187. committedTxs = append(txs, moreTxs...)
  188. if err := mempool.Update(2, committedTxs, nil, nil); err != nil {
  189. t.Error(err)
  190. }
  191. ensureNoFire(t, mempool.TxsAvailable(), timeoutMS)
  192. // send a bunch more txs, it should only fire once
  193. checkTxs(t, mempool, 100)
  194. ensureFire(t, mempool.TxsAvailable(), timeoutMS)
  195. ensureNoFire(t, mempool.TxsAvailable(), timeoutMS)
  196. }
  197. func TestSerialReap(t *testing.T) {
  198. app := counter.NewCounterApplication(true)
  199. app.SetOption(abci.RequestSetOption{Key: "serial", Value: "on"})
  200. cc := proxy.NewLocalClientCreator(app)
  201. mempool := newMempoolWithApp(cc)
  202. appConnCon, _ := cc.NewABCIClient()
  203. appConnCon.SetLogger(log.TestingLogger().With("module", "abci-client", "connection", "consensus"))
  204. err := appConnCon.Start()
  205. require.Nil(t, err)
  206. cacheMap := make(map[string]struct{})
  207. deliverTxsRange := func(start, end int) {
  208. // Deliver some txs.
  209. for i := start; i < end; i++ {
  210. // This will succeed
  211. txBytes := make([]byte, 8)
  212. binary.BigEndian.PutUint64(txBytes, uint64(i))
  213. err := mempool.CheckTx(txBytes, nil)
  214. _, cached := cacheMap[string(txBytes)]
  215. if cached {
  216. require.NotNil(t, err, "expected error for cached tx")
  217. } else {
  218. require.Nil(t, err, "expected no err for uncached tx")
  219. }
  220. cacheMap[string(txBytes)] = struct{}{}
  221. // Duplicates are cached and should return error
  222. err = mempool.CheckTx(txBytes, nil)
  223. require.NotNil(t, err, "Expected error after CheckTx on duplicated tx")
  224. }
  225. }
  226. reapCheck := func(exp int) {
  227. txs := mempool.ReapMaxBytesMaxGas(-1, -1)
  228. require.Equal(t, len(txs), exp, fmt.Sprintf("Expected to reap %v txs but got %v", exp, len(txs)))
  229. }
  230. updateRange := func(start, end int) {
  231. txs := make([]types.Tx, 0)
  232. for i := start; i < end; i++ {
  233. txBytes := make([]byte, 8)
  234. binary.BigEndian.PutUint64(txBytes, uint64(i))
  235. txs = append(txs, txBytes)
  236. }
  237. if err := mempool.Update(0, txs, nil, nil); err != nil {
  238. t.Error(err)
  239. }
  240. }
  241. commitRange := func(start, end int) {
  242. // Deliver some txs.
  243. for i := start; i < end; i++ {
  244. txBytes := make([]byte, 8)
  245. binary.BigEndian.PutUint64(txBytes, uint64(i))
  246. res, err := appConnCon.DeliverTxSync(txBytes)
  247. if err != nil {
  248. t.Errorf("Client error committing tx: %v", err)
  249. }
  250. if res.IsErr() {
  251. t.Errorf("Error committing tx. Code:%v result:%X log:%v",
  252. res.Code, res.Data, res.Log)
  253. }
  254. }
  255. res, err := appConnCon.CommitSync()
  256. if err != nil {
  257. t.Errorf("Client error committing: %v", err)
  258. }
  259. if len(res.Data) != 8 {
  260. t.Errorf("Error committing. Hash:%X", res.Data)
  261. }
  262. }
  263. //----------------------------------------
  264. // Deliver some txs.
  265. deliverTxsRange(0, 100)
  266. // Reap the txs.
  267. reapCheck(100)
  268. // Reap again. We should get the same amount
  269. reapCheck(100)
  270. // Deliver 0 to 999, we should reap 900 new txs
  271. // because 100 were already counted.
  272. deliverTxsRange(0, 1000)
  273. // Reap the txs.
  274. reapCheck(1000)
  275. // Reap again. We should get the same amount
  276. reapCheck(1000)
  277. // Commit from the conensus AppConn
  278. commitRange(0, 500)
  279. updateRange(0, 500)
  280. // We should have 500 left.
  281. reapCheck(500)
  282. // Deliver 100 invalid txs and 100 valid txs
  283. deliverTxsRange(900, 1100)
  284. // We should have 600 now.
  285. reapCheck(600)
  286. }
  287. func TestCacheRemove(t *testing.T) {
  288. cache := newMapTxCache(100)
  289. numTxs := 10
  290. txs := make([][]byte, numTxs)
  291. for i := 0; i < numTxs; i++ {
  292. // probability of collision is 2**-256
  293. txBytes := make([]byte, 32)
  294. rand.Read(txBytes)
  295. txs[i] = txBytes
  296. cache.Push(txBytes)
  297. // make sure its added to both the linked list and the map
  298. require.Equal(t, i+1, len(cache.map_))
  299. require.Equal(t, i+1, cache.list.Len())
  300. }
  301. for i := 0; i < numTxs; i++ {
  302. cache.Remove(txs[i])
  303. // make sure its removed from both the map and the linked list
  304. require.Equal(t, numTxs-(i+1), len(cache.map_))
  305. require.Equal(t, numTxs-(i+1), cache.list.Len())
  306. }
  307. }
  308. func TestMempoolCloseWAL(t *testing.T) {
  309. // 1. Create the temporary directory for mempool and WAL testing.
  310. rootDir, err := ioutil.TempDir("", "mempool-test")
  311. require.Nil(t, err, "expecting successful tmpdir creation")
  312. defer os.RemoveAll(rootDir)
  313. // 2. Ensure that it doesn't contain any elements -- Sanity check
  314. m1, err := filepath.Glob(filepath.Join(rootDir, "*"))
  315. require.Nil(t, err, "successful globbing expected")
  316. require.Equal(t, 0, len(m1), "no matches yet")
  317. // 3. Create the mempool
  318. wcfg := cfg.DefaultMempoolConfig()
  319. wcfg.RootDir = rootDir
  320. app := kvstore.NewKVStoreApplication()
  321. cc := proxy.NewLocalClientCreator(app)
  322. appConnMem, _ := cc.NewABCIClient()
  323. mempool := NewMempool(wcfg, appConnMem, 10)
  324. mempool.InitWAL()
  325. // 4. Ensure that the directory contains the WAL file
  326. m2, err := filepath.Glob(filepath.Join(rootDir, "*"))
  327. require.Nil(t, err, "successful globbing expected")
  328. require.Equal(t, 1, len(m2), "expecting the wal match in")
  329. // 5. Write some contents to the WAL
  330. mempool.CheckTx(types.Tx([]byte("foo")), nil)
  331. walFilepath := mempool.wal.Path
  332. sum1 := checksumFile(walFilepath, t)
  333. // 6. Sanity check to ensure that the written TX matches the expectation.
  334. require.Equal(t, sum1, checksumIt([]byte("foo\n")), "foo with a newline should be written")
  335. // 7. Invoke CloseWAL() and ensure it discards the
  336. // WAL thus any other write won't go through.
  337. require.True(t, mempool.CloseWAL(), "CloseWAL should CloseWAL")
  338. mempool.CheckTx(types.Tx([]byte("bar")), nil)
  339. sum2 := checksumFile(walFilepath, t)
  340. require.Equal(t, sum1, sum2, "expected no change to the WAL after invoking CloseWAL() since it was discarded")
  341. // 8. Second CloseWAL should do nothing
  342. require.False(t, mempool.CloseWAL(), "CloseWAL should CloseWAL")
  343. // 9. Sanity check to ensure that the WAL file still exists
  344. m3, err := filepath.Glob(filepath.Join(rootDir, "*"))
  345. require.Nil(t, err, "successful globbing expected")
  346. require.Equal(t, 1, len(m3), "expecting the wal match in")
  347. }
  348. func BenchmarkCacheInsertTime(b *testing.B) {
  349. cache := newMapTxCache(b.N)
  350. txs := make([][]byte, b.N)
  351. for i := 0; i < b.N; i++ {
  352. txs[i] = make([]byte, 8)
  353. binary.BigEndian.PutUint64(txs[i], uint64(i))
  354. }
  355. b.ResetTimer()
  356. for i := 0; i < b.N; i++ {
  357. cache.Push(txs[i])
  358. }
  359. }
  360. // This benchmark is probably skewed, since we actually will be removing
  361. // txs in parallel, which may cause some overhead due to mutex locking.
  362. func BenchmarkCacheRemoveTime(b *testing.B) {
  363. cache := newMapTxCache(b.N)
  364. txs := make([][]byte, b.N)
  365. for i := 0; i < b.N; i++ {
  366. txs[i] = make([]byte, 8)
  367. binary.BigEndian.PutUint64(txs[i], uint64(i))
  368. cache.Push(txs[i])
  369. }
  370. b.ResetTimer()
  371. for i := 0; i < b.N; i++ {
  372. cache.Remove(txs[i])
  373. }
  374. }
  375. func checksumIt(data []byte) string {
  376. h := md5.New()
  377. h.Write(data)
  378. return fmt.Sprintf("%x", h.Sum(nil))
  379. }
  380. func checksumFile(p string, t *testing.T) string {
  381. data, err := ioutil.ReadFile(p)
  382. require.Nil(t, err, "expecting successful read of %q", p)
  383. return checksumIt(data)
  384. }