You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

407 lines
12 KiB

8 years ago
8 years ago
8 years ago
  1. package mempool
  2. import (
  3. "crypto/md5"
  4. "crypto/rand"
  5. "encoding/binary"
  6. "fmt"
  7. "io/ioutil"
  8. "os"
  9. "path/filepath"
  10. "testing"
  11. "time"
  12. "github.com/stretchr/testify/assert"
  13. "github.com/stretchr/testify/require"
  14. "github.com/tendermint/tendermint/abci/example/counter"
  15. "github.com/tendermint/tendermint/abci/example/kvstore"
  16. abci "github.com/tendermint/tendermint/abci/types"
  17. cfg "github.com/tendermint/tendermint/config"
  18. "github.com/tendermint/tendermint/libs/log"
  19. "github.com/tendermint/tendermint/proxy"
  20. "github.com/tendermint/tendermint/types"
  21. )
  22. func newMempoolWithApp(cc proxy.ClientCreator) *Mempool {
  23. config := cfg.ResetTestRoot("mempool_test")
  24. appConnMem, _ := cc.NewABCIClient()
  25. appConnMem.SetLogger(log.TestingLogger().With("module", "abci-client", "connection", "mempool"))
  26. err := appConnMem.Start()
  27. if err != nil {
  28. panic(err)
  29. }
  30. mempool := NewMempool(config.Mempool, appConnMem, 0)
  31. mempool.SetLogger(log.TestingLogger())
  32. return mempool
  33. }
  34. func ensureNoFire(t *testing.T, ch <-chan struct{}, timeoutMS int) {
  35. timer := time.NewTimer(time.Duration(timeoutMS) * time.Millisecond)
  36. select {
  37. case <-ch:
  38. t.Fatal("Expected not to fire")
  39. case <-timer.C:
  40. }
  41. }
  42. func ensureFire(t *testing.T, ch <-chan struct{}, timeoutMS int) {
  43. timer := time.NewTimer(time.Duration(timeoutMS) * time.Millisecond)
  44. select {
  45. case <-ch:
  46. case <-timer.C:
  47. t.Fatal("Expected to fire")
  48. }
  49. }
  50. func checkTxs(t *testing.T, mempool *Mempool, count int) types.Txs {
  51. txs := make(types.Txs, count)
  52. for i := 0; i < count; i++ {
  53. txBytes := make([]byte, 20)
  54. txs[i] = txBytes
  55. _, err := rand.Read(txBytes)
  56. if err != nil {
  57. t.Error(err)
  58. }
  59. if err := mempool.CheckTx(txBytes, nil); err != nil {
  60. // Skip invalid txs.
  61. // TestMempoolFilters will fail otherwise. It asserts a number of txs
  62. // returned.
  63. if IsPreCheckError(err) {
  64. continue
  65. }
  66. t.Fatalf("CheckTx failed: %v while checking #%d tx", err, i)
  67. }
  68. }
  69. return txs
  70. }
  71. func TestReapMaxBytesMaxGas(t *testing.T) {
  72. app := kvstore.NewKVStoreApplication()
  73. cc := proxy.NewLocalClientCreator(app)
  74. mempool := newMempoolWithApp(cc)
  75. // Ensure gas calculation behaves as expected
  76. checkTxs(t, mempool, 1)
  77. tx0 := mempool.TxsFront().Value.(*mempoolTx)
  78. // assert that kv store has gas wanted = 1.
  79. require.Equal(t, app.CheckTx(tx0.tx).GasWanted, int64(1), "KVStore had a gas value neq to 1")
  80. require.Equal(t, tx0.gasWanted, int64(1), "transactions gas was set incorrectly")
  81. // ensure each tx is 20 bytes long
  82. require.Equal(t, len(tx0.tx), 20, "Tx is longer than 20 bytes")
  83. mempool.Flush()
  84. // each table driven test creates numTxsToCreate txs with checkTx, and at the end clears all remaining txs.
  85. // each tx has 20 bytes + amino overhead = 21 bytes, 1 gas
  86. tests := []struct {
  87. numTxsToCreate int
  88. maxBytes int64
  89. maxGas int64
  90. expectedNumTxs int
  91. }{
  92. {20, -1, -1, 20},
  93. {20, -1, 0, 0},
  94. {20, -1, 10, 10},
  95. {20, -1, 30, 20},
  96. {20, 0, -1, 0},
  97. {20, 0, 10, 0},
  98. {20, 10, 10, 0},
  99. {20, 22, 10, 1},
  100. {20, 220, -1, 10},
  101. {20, 220, 5, 5},
  102. {20, 220, 10, 10},
  103. {20, 220, 15, 10},
  104. {20, 20000, -1, 20},
  105. {20, 20000, 5, 5},
  106. {20, 20000, 30, 20},
  107. }
  108. for tcIndex, tt := range tests {
  109. checkTxs(t, mempool, tt.numTxsToCreate)
  110. got := mempool.ReapMaxBytesMaxGas(tt.maxBytes, tt.maxGas)
  111. assert.Equal(t, tt.expectedNumTxs, len(got), "Got %d txs, expected %d, tc #%d",
  112. len(got), tt.expectedNumTxs, tcIndex)
  113. mempool.Flush()
  114. }
  115. }
  116. func TestMempoolFilters(t *testing.T) {
  117. app := kvstore.NewKVStoreApplication()
  118. cc := proxy.NewLocalClientCreator(app)
  119. mempool := newMempoolWithApp(cc)
  120. emptyTxArr := []types.Tx{[]byte{}}
  121. nopPreFilter := func(tx types.Tx) error { return nil }
  122. nopPostFilter := func(tx types.Tx, res *abci.ResponseCheckTx) error { return nil }
  123. // each table driven test creates numTxsToCreate txs with checkTx, and at the end clears all remaining txs.
  124. // each tx has 20 bytes + amino overhead = 21 bytes, 1 gas
  125. tests := []struct {
  126. numTxsToCreate int
  127. preFilter PreCheckFunc
  128. postFilter PostCheckFunc
  129. expectedNumTxs int
  130. }{
  131. {10, nopPreFilter, nopPostFilter, 10},
  132. {10, PreCheckAminoMaxBytes(10), nopPostFilter, 0},
  133. {10, PreCheckAminoMaxBytes(20), nopPostFilter, 0},
  134. {10, PreCheckAminoMaxBytes(22), nopPostFilter, 10},
  135. {10, nopPreFilter, PostCheckMaxGas(-1), 10},
  136. {10, nopPreFilter, PostCheckMaxGas(0), 0},
  137. {10, nopPreFilter, PostCheckMaxGas(1), 10},
  138. {10, nopPreFilter, PostCheckMaxGas(3000), 10},
  139. {10, PreCheckAminoMaxBytes(10), PostCheckMaxGas(20), 0},
  140. {10, PreCheckAminoMaxBytes(30), PostCheckMaxGas(20), 10},
  141. {10, PreCheckAminoMaxBytes(22), PostCheckMaxGas(1), 10},
  142. {10, PreCheckAminoMaxBytes(22), PostCheckMaxGas(0), 0},
  143. }
  144. for tcIndex, tt := range tests {
  145. mempool.Update(1, emptyTxArr, tt.preFilter, tt.postFilter)
  146. checkTxs(t, mempool, tt.numTxsToCreate)
  147. require.Equal(t, tt.expectedNumTxs, mempool.Size(), "mempool had the incorrect size, on test case %d", tcIndex)
  148. mempool.Flush()
  149. }
  150. }
  151. func TestMempoolUpdateAddsTxsToCache(t *testing.T) {
  152. app := kvstore.NewKVStoreApplication()
  153. cc := proxy.NewLocalClientCreator(app)
  154. mempool := newMempoolWithApp(cc)
  155. mempool.Update(1, []types.Tx{[]byte{0x01}}, nil, nil)
  156. err := mempool.CheckTx([]byte{0x01}, nil)
  157. if assert.Error(t, err) {
  158. assert.Equal(t, ErrTxInCache, err)
  159. }
  160. }
  161. func TestTxsAvailable(t *testing.T) {
  162. app := kvstore.NewKVStoreApplication()
  163. cc := proxy.NewLocalClientCreator(app)
  164. mempool := newMempoolWithApp(cc)
  165. mempool.EnableTxsAvailable()
  166. timeoutMS := 500
  167. // with no txs, it shouldnt fire
  168. ensureNoFire(t, mempool.TxsAvailable(), timeoutMS)
  169. // send a bunch of txs, it should only fire once
  170. txs := checkTxs(t, mempool, 100)
  171. ensureFire(t, mempool.TxsAvailable(), timeoutMS)
  172. ensureNoFire(t, mempool.TxsAvailable(), timeoutMS)
  173. // call update with half the txs.
  174. // it should fire once now for the new height
  175. // since there are still txs left
  176. committedTxs, txs := txs[:50], txs[50:]
  177. if err := mempool.Update(1, committedTxs, nil, nil); err != nil {
  178. t.Error(err)
  179. }
  180. ensureFire(t, mempool.TxsAvailable(), timeoutMS)
  181. ensureNoFire(t, mempool.TxsAvailable(), timeoutMS)
  182. // send a bunch more txs. we already fired for this height so it shouldnt fire again
  183. moreTxs := checkTxs(t, mempool, 50)
  184. ensureNoFire(t, mempool.TxsAvailable(), timeoutMS)
  185. // now call update with all the txs. it should not fire as there are no txs left
  186. committedTxs = append(txs, moreTxs...)
  187. if err := mempool.Update(2, committedTxs, nil, nil); err != nil {
  188. t.Error(err)
  189. }
  190. ensureNoFire(t, mempool.TxsAvailable(), timeoutMS)
  191. // send a bunch more txs, it should only fire once
  192. checkTxs(t, mempool, 100)
  193. ensureFire(t, mempool.TxsAvailable(), timeoutMS)
  194. ensureNoFire(t, mempool.TxsAvailable(), timeoutMS)
  195. }
  196. func TestSerialReap(t *testing.T) {
  197. app := counter.NewCounterApplication(true)
  198. app.SetOption(abci.RequestSetOption{Key: "serial", Value: "on"})
  199. cc := proxy.NewLocalClientCreator(app)
  200. mempool := newMempoolWithApp(cc)
  201. appConnCon, _ := cc.NewABCIClient()
  202. appConnCon.SetLogger(log.TestingLogger().With("module", "abci-client", "connection", "consensus"))
  203. err := appConnCon.Start()
  204. require.Nil(t, err)
  205. cacheMap := make(map[string]struct{})
  206. deliverTxsRange := func(start, end int) {
  207. // Deliver some txs.
  208. for i := start; i < end; i++ {
  209. // This will succeed
  210. txBytes := make([]byte, 8)
  211. binary.BigEndian.PutUint64(txBytes, uint64(i))
  212. err := mempool.CheckTx(txBytes, nil)
  213. _, cached := cacheMap[string(txBytes)]
  214. if cached {
  215. require.NotNil(t, err, "expected error for cached tx")
  216. } else {
  217. require.Nil(t, err, "expected no err for uncached tx")
  218. }
  219. cacheMap[string(txBytes)] = struct{}{}
  220. // Duplicates are cached and should return error
  221. err = mempool.CheckTx(txBytes, nil)
  222. require.NotNil(t, err, "Expected error after CheckTx on duplicated tx")
  223. }
  224. }
  225. reapCheck := func(exp int) {
  226. txs := mempool.ReapMaxBytesMaxGas(-1, -1)
  227. require.Equal(t, len(txs), exp, fmt.Sprintf("Expected to reap %v txs but got %v", exp, len(txs)))
  228. }
  229. updateRange := func(start, end int) {
  230. txs := make([]types.Tx, 0)
  231. for i := start; i < end; i++ {
  232. txBytes := make([]byte, 8)
  233. binary.BigEndian.PutUint64(txBytes, uint64(i))
  234. txs = append(txs, txBytes)
  235. }
  236. if err := mempool.Update(0, txs, nil, nil); err != nil {
  237. t.Error(err)
  238. }
  239. }
  240. commitRange := func(start, end int) {
  241. // Deliver some txs.
  242. for i := start; i < end; i++ {
  243. txBytes := make([]byte, 8)
  244. binary.BigEndian.PutUint64(txBytes, uint64(i))
  245. res, err := appConnCon.DeliverTxSync(txBytes)
  246. if err != nil {
  247. t.Errorf("Client error committing tx: %v", err)
  248. }
  249. if res.IsErr() {
  250. t.Errorf("Error committing tx. Code:%v result:%X log:%v",
  251. res.Code, res.Data, res.Log)
  252. }
  253. }
  254. res, err := appConnCon.CommitSync()
  255. if err != nil {
  256. t.Errorf("Client error committing: %v", err)
  257. }
  258. if len(res.Data) != 8 {
  259. t.Errorf("Error committing. Hash:%X", res.Data)
  260. }
  261. }
  262. //----------------------------------------
  263. // Deliver some txs.
  264. deliverTxsRange(0, 100)
  265. // Reap the txs.
  266. reapCheck(100)
  267. // Reap again. We should get the same amount
  268. reapCheck(100)
  269. // Deliver 0 to 999, we should reap 900 new txs
  270. // because 100 were already counted.
  271. deliverTxsRange(0, 1000)
  272. // Reap the txs.
  273. reapCheck(1000)
  274. // Reap again. We should get the same amount
  275. reapCheck(1000)
  276. // Commit from the conensus AppConn
  277. commitRange(0, 500)
  278. updateRange(0, 500)
  279. // We should have 500 left.
  280. reapCheck(500)
  281. // Deliver 100 invalid txs and 100 valid txs
  282. deliverTxsRange(900, 1100)
  283. // We should have 600 now.
  284. reapCheck(600)
  285. }
  286. func TestCacheRemove(t *testing.T) {
  287. cache := newMapTxCache(100)
  288. numTxs := 10
  289. txs := make([][]byte, numTxs)
  290. for i := 0; i < numTxs; i++ {
  291. // probability of collision is 2**-256
  292. txBytes := make([]byte, 32)
  293. rand.Read(txBytes)
  294. txs[i] = txBytes
  295. cache.Push(txBytes)
  296. // make sure its added to both the linked list and the map
  297. require.Equal(t, i+1, len(cache.map_))
  298. require.Equal(t, i+1, cache.list.Len())
  299. }
  300. for i := 0; i < numTxs; i++ {
  301. cache.Remove(txs[i])
  302. // make sure its removed from both the map and the linked list
  303. require.Equal(t, numTxs-(i+1), len(cache.map_))
  304. require.Equal(t, numTxs-(i+1), cache.list.Len())
  305. }
  306. }
  307. func TestMempoolCloseWAL(t *testing.T) {
  308. // 1. Create the temporary directory for mempool and WAL testing.
  309. rootDir, err := ioutil.TempDir("", "mempool-test")
  310. require.Nil(t, err, "expecting successful tmpdir creation")
  311. defer os.RemoveAll(rootDir)
  312. // 2. Ensure that it doesn't contain any elements -- Sanity check
  313. m1, err := filepath.Glob(filepath.Join(rootDir, "*"))
  314. require.Nil(t, err, "successful globbing expected")
  315. require.Equal(t, 0, len(m1), "no matches yet")
  316. // 3. Create the mempool
  317. wcfg := cfg.DefaultMempoolConfig()
  318. wcfg.RootDir = rootDir
  319. app := kvstore.NewKVStoreApplication()
  320. cc := proxy.NewLocalClientCreator(app)
  321. appConnMem, _ := cc.NewABCIClient()
  322. mempool := NewMempool(wcfg, appConnMem, 10)
  323. mempool.InitWAL()
  324. // 4. Ensure that the directory contains the WAL file
  325. m2, err := filepath.Glob(filepath.Join(rootDir, "*"))
  326. require.Nil(t, err, "successful globbing expected")
  327. require.Equal(t, 1, len(m2), "expecting the wal match in")
  328. // 5. Write some contents to the WAL
  329. mempool.CheckTx(types.Tx([]byte("foo")), nil)
  330. walFilepath := mempool.wal.Path
  331. sum1 := checksumFile(walFilepath, t)
  332. // 6. Sanity check to ensure that the written TX matches the expectation.
  333. require.Equal(t, sum1, checksumIt([]byte("foo\n")), "foo with a newline should be written")
  334. // 7. Invoke CloseWAL() and ensure it discards the
  335. // WAL thus any other write won't go through.
  336. mempool.CloseWAL()
  337. mempool.CheckTx(types.Tx([]byte("bar")), nil)
  338. sum2 := checksumFile(walFilepath, t)
  339. require.Equal(t, sum1, sum2, "expected no change to the WAL after invoking CloseWAL() since it was discarded")
  340. // 8. Sanity check to ensure that the WAL file still exists
  341. m3, err := filepath.Glob(filepath.Join(rootDir, "*"))
  342. require.Nil(t, err, "successful globbing expected")
  343. require.Equal(t, 1, len(m3), "expecting the wal match in")
  344. }
  345. func checksumIt(data []byte) string {
  346. h := md5.New()
  347. h.Write(data)
  348. return fmt.Sprintf("%x", h.Sum(nil))
  349. }
  350. func checksumFile(p string, t *testing.T) string {
  351. data, err := ioutil.ReadFile(p)
  352. require.Nil(t, err, "expecting successful read of %q", p)
  353. return checksumIt(data)
  354. }