You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

538 lines
14 KiB

  1. package mempool
  2. import (
  3. "bytes"
  4. "context"
  5. "errors"
  6. "fmt"
  7. "math/rand"
  8. "os"
  9. "sort"
  10. "strconv"
  11. "strings"
  12. "sync"
  13. "testing"
  14. "time"
  15. "github.com/stretchr/testify/require"
  16. abciclient "github.com/tendermint/tendermint/abci/client"
  17. "github.com/tendermint/tendermint/abci/example/code"
  18. "github.com/tendermint/tendermint/abci/example/kvstore"
  19. abci "github.com/tendermint/tendermint/abci/types"
  20. "github.com/tendermint/tendermint/config"
  21. "github.com/tendermint/tendermint/libs/log"
  22. "github.com/tendermint/tendermint/types"
  23. )
  24. // application extends the KV store application by overriding CheckTx to provide
  25. // transaction priority based on the value in the key/value pair.
  26. type application struct {
  27. *kvstore.Application
  28. }
  29. type testTx struct {
  30. tx types.Tx
  31. priority int64
  32. }
  33. func (app *application) CheckTx(req abci.RequestCheckTx) abci.ResponseCheckTx {
  34. var (
  35. priority int64
  36. sender string
  37. )
  38. // infer the priority from the raw transaction value (sender=key=value)
  39. parts := bytes.Split(req.Tx, []byte("="))
  40. if len(parts) == 3 {
  41. v, err := strconv.ParseInt(string(parts[2]), 10, 64)
  42. if err != nil {
  43. return abci.ResponseCheckTx{
  44. Priority: priority,
  45. Code: 100,
  46. GasWanted: 1,
  47. }
  48. }
  49. priority = v
  50. sender = string(parts[0])
  51. } else {
  52. return abci.ResponseCheckTx{
  53. Priority: priority,
  54. Code: 101,
  55. GasWanted: 1,
  56. }
  57. }
  58. return abci.ResponseCheckTx{
  59. Priority: priority,
  60. Sender: sender,
  61. Code: code.CodeTypeOK,
  62. GasWanted: 1,
  63. }
  64. }
  65. func setup(t testing.TB, cacheSize int, options ...TxMempoolOption) *TxMempool {
  66. t.Helper()
  67. app := &application{kvstore.NewApplication()}
  68. cc := abciclient.NewLocalCreator(app)
  69. logger := log.TestingLogger()
  70. cfg, err := config.ResetTestRoot(strings.ReplaceAll(t.Name(), "/", "|"))
  71. require.NoError(t, err)
  72. cfg.Mempool.CacheSize = cacheSize
  73. appConnMem, err := cc(logger)
  74. require.NoError(t, err)
  75. require.NoError(t, appConnMem.Start())
  76. t.Cleanup(func() {
  77. os.RemoveAll(cfg.RootDir)
  78. require.NoError(t, appConnMem.Stop())
  79. })
  80. return NewTxMempool(logger.With("test", t.Name()), cfg.Mempool, appConnMem, 0, options...)
  81. }
  82. func checkTxs(t *testing.T, txmp *TxMempool, numTxs int, peerID uint16) []testTx {
  83. txs := make([]testTx, numTxs)
  84. txInfo := TxInfo{SenderID: peerID}
  85. rng := rand.New(rand.NewSource(time.Now().UnixNano()))
  86. for i := 0; i < numTxs; i++ {
  87. prefix := make([]byte, 20)
  88. _, err := rng.Read(prefix)
  89. require.NoError(t, err)
  90. priority := int64(rng.Intn(9999-1000) + 1000)
  91. txs[i] = testTx{
  92. tx: []byte(fmt.Sprintf("sender-%d-%d=%X=%d", i, peerID, prefix, priority)),
  93. priority: priority,
  94. }
  95. require.NoError(t, txmp.CheckTx(context.Background(), txs[i].tx, nil, txInfo))
  96. }
  97. return txs
  98. }
  99. func convertTex(in []testTx) types.Txs {
  100. out := make([]types.Tx, len(in))
  101. for idx := range in {
  102. out[idx] = in[idx].tx
  103. }
  104. return out
  105. }
  106. func TestTxMempool_TxsAvailable(t *testing.T) {
  107. txmp := setup(t, 0)
  108. txmp.EnableTxsAvailable()
  109. ensureNoTxFire := func() {
  110. timer := time.NewTimer(500 * time.Millisecond)
  111. select {
  112. case <-txmp.TxsAvailable():
  113. require.Fail(t, "unexpected transactions event")
  114. case <-timer.C:
  115. }
  116. }
  117. ensureTxFire := func() {
  118. timer := time.NewTimer(500 * time.Millisecond)
  119. select {
  120. case <-txmp.TxsAvailable():
  121. case <-timer.C:
  122. require.Fail(t, "expected transactions event")
  123. }
  124. }
  125. // ensure no event as we have not executed any transactions yet
  126. ensureNoTxFire()
  127. // Execute CheckTx for some transactions and ensure TxsAvailable only fires
  128. // once.
  129. txs := checkTxs(t, txmp, 100, 0)
  130. ensureTxFire()
  131. ensureNoTxFire()
  132. rawTxs := make([]types.Tx, len(txs))
  133. for i, tx := range txs {
  134. rawTxs[i] = tx.tx
  135. }
  136. responses := make([]*abci.ResponseDeliverTx, len(rawTxs[:50]))
  137. for i := 0; i < len(responses); i++ {
  138. responses[i] = &abci.ResponseDeliverTx{Code: abci.CodeTypeOK}
  139. }
  140. // commit half the transactions and ensure we fire an event
  141. txmp.Lock()
  142. require.NoError(t, txmp.Update(1, rawTxs[:50], responses, nil, nil))
  143. txmp.Unlock()
  144. ensureTxFire()
  145. ensureNoTxFire()
  146. // Execute CheckTx for more transactions and ensure we do not fire another
  147. // event as we're still on the same height (1).
  148. _ = checkTxs(t, txmp, 100, 0)
  149. ensureNoTxFire()
  150. }
  151. func TestTxMempool_Size(t *testing.T) {
  152. txmp := setup(t, 0)
  153. txs := checkTxs(t, txmp, 100, 0)
  154. require.Equal(t, len(txs), txmp.Size())
  155. require.Equal(t, int64(5690), txmp.SizeBytes())
  156. rawTxs := make([]types.Tx, len(txs))
  157. for i, tx := range txs {
  158. rawTxs[i] = tx.tx
  159. }
  160. responses := make([]*abci.ResponseDeliverTx, len(rawTxs[:50]))
  161. for i := 0; i < len(responses); i++ {
  162. responses[i] = &abci.ResponseDeliverTx{Code: abci.CodeTypeOK}
  163. }
  164. txmp.Lock()
  165. require.NoError(t, txmp.Update(1, rawTxs[:50], responses, nil, nil))
  166. txmp.Unlock()
  167. require.Equal(t, len(rawTxs)/2, txmp.Size())
  168. require.Equal(t, int64(2850), txmp.SizeBytes())
  169. }
  170. func TestTxMempool_Flush(t *testing.T) {
  171. txmp := setup(t, 0)
  172. txs := checkTxs(t, txmp, 100, 0)
  173. require.Equal(t, len(txs), txmp.Size())
  174. require.Equal(t, int64(5690), txmp.SizeBytes())
  175. rawTxs := make([]types.Tx, len(txs))
  176. for i, tx := range txs {
  177. rawTxs[i] = tx.tx
  178. }
  179. responses := make([]*abci.ResponseDeliverTx, len(rawTxs[:50]))
  180. for i := 0; i < len(responses); i++ {
  181. responses[i] = &abci.ResponseDeliverTx{Code: abci.CodeTypeOK}
  182. }
  183. txmp.Lock()
  184. require.NoError(t, txmp.Update(1, rawTxs[:50], responses, nil, nil))
  185. txmp.Unlock()
  186. txmp.Flush()
  187. require.Zero(t, txmp.Size())
  188. require.Equal(t, int64(0), txmp.SizeBytes())
  189. }
  190. func TestTxMempool_ReapMaxBytesMaxGas(t *testing.T) {
  191. txmp := setup(t, 0)
  192. tTxs := checkTxs(t, txmp, 100, 0) // all txs request 1 gas unit
  193. require.Equal(t, len(tTxs), txmp.Size())
  194. require.Equal(t, int64(5690), txmp.SizeBytes())
  195. txMap := make(map[types.TxKey]testTx)
  196. priorities := make([]int64, len(tTxs))
  197. for i, tTx := range tTxs {
  198. txMap[tTx.tx.Key()] = tTx
  199. priorities[i] = tTx.priority
  200. }
  201. sort.Slice(priorities, func(i, j int) bool {
  202. // sort by priority, i.e. decreasing order
  203. return priorities[i] > priorities[j]
  204. })
  205. ensurePrioritized := func(reapedTxs types.Txs) {
  206. reapedPriorities := make([]int64, len(reapedTxs))
  207. for i, rTx := range reapedTxs {
  208. reapedPriorities[i] = txMap[rTx.Key()].priority
  209. }
  210. require.Equal(t, priorities[:len(reapedPriorities)], reapedPriorities)
  211. }
  212. // reap by gas capacity only
  213. reapedTxs := txmp.ReapMaxBytesMaxGas(-1, 50)
  214. ensurePrioritized(reapedTxs)
  215. require.Equal(t, len(tTxs), txmp.Size())
  216. require.Equal(t, int64(5690), txmp.SizeBytes())
  217. require.Len(t, reapedTxs, 50)
  218. // reap by transaction bytes only
  219. reapedTxs = txmp.ReapMaxBytesMaxGas(1000, -1)
  220. ensurePrioritized(reapedTxs)
  221. require.Equal(t, len(tTxs), txmp.Size())
  222. require.Equal(t, int64(5690), txmp.SizeBytes())
  223. require.GreaterOrEqual(t, len(reapedTxs), 16)
  224. // Reap by both transaction bytes and gas, where the size yields 31 reaped
  225. // transactions and the gas limit reaps 25 transactions.
  226. reapedTxs = txmp.ReapMaxBytesMaxGas(1500, 30)
  227. ensurePrioritized(reapedTxs)
  228. require.Equal(t, len(tTxs), txmp.Size())
  229. require.Equal(t, int64(5690), txmp.SizeBytes())
  230. require.Len(t, reapedTxs, 25)
  231. }
  232. func TestTxMempool_ReapMaxTxs(t *testing.T) {
  233. txmp := setup(t, 0)
  234. tTxs := checkTxs(t, txmp, 100, 0)
  235. require.Equal(t, len(tTxs), txmp.Size())
  236. require.Equal(t, int64(5690), txmp.SizeBytes())
  237. txMap := make(map[types.TxKey]testTx)
  238. priorities := make([]int64, len(tTxs))
  239. for i, tTx := range tTxs {
  240. txMap[tTx.tx.Key()] = tTx
  241. priorities[i] = tTx.priority
  242. }
  243. sort.Slice(priorities, func(i, j int) bool {
  244. // sort by priority, i.e. decreasing order
  245. return priorities[i] > priorities[j]
  246. })
  247. ensurePrioritized := func(reapedTxs types.Txs) {
  248. reapedPriorities := make([]int64, len(reapedTxs))
  249. for i, rTx := range reapedTxs {
  250. reapedPriorities[i] = txMap[rTx.Key()].priority
  251. }
  252. require.Equal(t, priorities[:len(reapedPriorities)], reapedPriorities)
  253. }
  254. // reap all transactions
  255. reapedTxs := txmp.ReapMaxTxs(-1)
  256. ensurePrioritized(reapedTxs)
  257. require.Equal(t, len(tTxs), txmp.Size())
  258. require.Equal(t, int64(5690), txmp.SizeBytes())
  259. require.Len(t, reapedTxs, len(tTxs))
  260. // reap a single transaction
  261. reapedTxs = txmp.ReapMaxTxs(1)
  262. ensurePrioritized(reapedTxs)
  263. require.Equal(t, len(tTxs), txmp.Size())
  264. require.Equal(t, int64(5690), txmp.SizeBytes())
  265. require.Len(t, reapedTxs, 1)
  266. // reap half of the transactions
  267. reapedTxs = txmp.ReapMaxTxs(len(tTxs) / 2)
  268. ensurePrioritized(reapedTxs)
  269. require.Equal(t, len(tTxs), txmp.Size())
  270. require.Equal(t, int64(5690), txmp.SizeBytes())
  271. require.Len(t, reapedTxs, len(tTxs)/2)
  272. }
  273. func TestTxMempool_CheckTxExceedsMaxSize(t *testing.T) {
  274. txmp := setup(t, 0)
  275. rng := rand.New(rand.NewSource(time.Now().UnixNano()))
  276. tx := make([]byte, txmp.config.MaxTxBytes+1)
  277. _, err := rng.Read(tx)
  278. require.NoError(t, err)
  279. require.Error(t, txmp.CheckTx(context.Background(), tx, nil, TxInfo{SenderID: 0}))
  280. tx = make([]byte, txmp.config.MaxTxBytes-1)
  281. _, err = rng.Read(tx)
  282. require.NoError(t, err)
  283. require.NoError(t, txmp.CheckTx(context.Background(), tx, nil, TxInfo{SenderID: 0}))
  284. }
  285. func TestTxMempool_CheckTxSamePeer(t *testing.T) {
  286. txmp := setup(t, 100)
  287. peerID := uint16(1)
  288. rng := rand.New(rand.NewSource(time.Now().UnixNano()))
  289. prefix := make([]byte, 20)
  290. _, err := rng.Read(prefix)
  291. require.NoError(t, err)
  292. tx := []byte(fmt.Sprintf("sender-0=%X=%d", prefix, 50))
  293. require.NoError(t, txmp.CheckTx(context.Background(), tx, nil, TxInfo{SenderID: peerID}))
  294. require.Error(t, txmp.CheckTx(context.Background(), tx, nil, TxInfo{SenderID: peerID}))
  295. }
  296. func TestTxMempool_CheckTxSameSender(t *testing.T) {
  297. txmp := setup(t, 100)
  298. peerID := uint16(1)
  299. rng := rand.New(rand.NewSource(time.Now().UnixNano()))
  300. prefix1 := make([]byte, 20)
  301. _, err := rng.Read(prefix1)
  302. require.NoError(t, err)
  303. prefix2 := make([]byte, 20)
  304. _, err = rng.Read(prefix2)
  305. require.NoError(t, err)
  306. tx1 := []byte(fmt.Sprintf("sender-0=%X=%d", prefix1, 50))
  307. tx2 := []byte(fmt.Sprintf("sender-0=%X=%d", prefix2, 50))
  308. require.NoError(t, txmp.CheckTx(context.Background(), tx1, nil, TxInfo{SenderID: peerID}))
  309. require.Equal(t, 1, txmp.Size())
  310. require.NoError(t, txmp.CheckTx(context.Background(), tx2, nil, TxInfo{SenderID: peerID}))
  311. require.Equal(t, 1, txmp.Size())
  312. }
  313. func TestTxMempool_ConcurrentTxs(t *testing.T) {
  314. txmp := setup(t, 100)
  315. rng := rand.New(rand.NewSource(time.Now().UnixNano()))
  316. checkTxDone := make(chan struct{})
  317. var wg sync.WaitGroup
  318. wg.Add(1)
  319. go func() {
  320. for i := 0; i < 20; i++ {
  321. _ = checkTxs(t, txmp, 100, 0)
  322. dur := rng.Intn(1000-500) + 500
  323. time.Sleep(time.Duration(dur) * time.Millisecond)
  324. }
  325. wg.Done()
  326. close(checkTxDone)
  327. }()
  328. wg.Add(1)
  329. go func() {
  330. ticker := time.NewTicker(time.Second)
  331. defer ticker.Stop()
  332. defer wg.Done()
  333. var height int64 = 1
  334. for range ticker.C {
  335. reapedTxs := txmp.ReapMaxTxs(200)
  336. if len(reapedTxs) > 0 {
  337. responses := make([]*abci.ResponseDeliverTx, len(reapedTxs))
  338. for i := 0; i < len(responses); i++ {
  339. var code uint32
  340. if i%10 == 0 {
  341. code = 100
  342. } else {
  343. code = abci.CodeTypeOK
  344. }
  345. responses[i] = &abci.ResponseDeliverTx{Code: code}
  346. }
  347. txmp.Lock()
  348. require.NoError(t, txmp.Update(height, reapedTxs, responses, nil, nil))
  349. txmp.Unlock()
  350. height++
  351. } else {
  352. // only return once we know we finished the CheckTx loop
  353. select {
  354. case <-checkTxDone:
  355. return
  356. default:
  357. }
  358. }
  359. }
  360. }()
  361. wg.Wait()
  362. require.Zero(t, txmp.Size())
  363. require.Zero(t, txmp.SizeBytes())
  364. }
  365. func TestTxMempool_ExpiredTxs_NumBlocks(t *testing.T) {
  366. txmp := setup(t, 500)
  367. txmp.height = 100
  368. txmp.config.TTLNumBlocks = 10
  369. tTxs := checkTxs(t, txmp, 100, 0)
  370. require.Equal(t, len(tTxs), txmp.Size())
  371. require.Equal(t, 100, txmp.heightIndex.Size())
  372. // reap 5 txs at the next height -- no txs should expire
  373. reapedTxs := txmp.ReapMaxTxs(5)
  374. responses := make([]*abci.ResponseDeliverTx, len(reapedTxs))
  375. for i := 0; i < len(responses); i++ {
  376. responses[i] = &abci.ResponseDeliverTx{Code: abci.CodeTypeOK}
  377. }
  378. txmp.Lock()
  379. require.NoError(t, txmp.Update(txmp.height+1, reapedTxs, responses, nil, nil))
  380. txmp.Unlock()
  381. require.Equal(t, 95, txmp.Size())
  382. require.Equal(t, 95, txmp.heightIndex.Size())
  383. // check more txs at height 101
  384. _ = checkTxs(t, txmp, 50, 1)
  385. require.Equal(t, 145, txmp.Size())
  386. require.Equal(t, 145, txmp.heightIndex.Size())
  387. // Reap 5 txs at a height that would expire all the transactions from before
  388. // the previous Update (height 100).
  389. //
  390. // NOTE: When we reap txs below, we do not know if we're picking txs from the
  391. // initial CheckTx calls or from the second round of CheckTx calls. Thus, we
  392. // cannot guarantee that all 95 txs are remaining that should be expired and
  393. // removed. However, we do know that that at most 95 txs can be expired and
  394. // removed.
  395. reapedTxs = txmp.ReapMaxTxs(5)
  396. responses = make([]*abci.ResponseDeliverTx, len(reapedTxs))
  397. for i := 0; i < len(responses); i++ {
  398. responses[i] = &abci.ResponseDeliverTx{Code: abci.CodeTypeOK}
  399. }
  400. txmp.Lock()
  401. require.NoError(t, txmp.Update(txmp.height+10, reapedTxs, responses, nil, nil))
  402. txmp.Unlock()
  403. require.GreaterOrEqual(t, txmp.Size(), 45)
  404. require.GreaterOrEqual(t, txmp.heightIndex.Size(), 45)
  405. }
  406. func TestTxMempool_CheckTxPostCheckError(t *testing.T) {
  407. cases := []struct {
  408. name string
  409. err error
  410. }{
  411. {
  412. name: "error",
  413. err: errors.New("test error"),
  414. },
  415. {
  416. name: "no error",
  417. err: nil,
  418. },
  419. }
  420. for _, tc := range cases {
  421. testCase := tc
  422. t.Run(testCase.name, func(t *testing.T) {
  423. postCheckFn := func(_ types.Tx, _ *abci.ResponseCheckTx) error {
  424. return testCase.err
  425. }
  426. txmp := setup(t, 0, WithPostCheck(postCheckFn))
  427. rng := rand.New(rand.NewSource(time.Now().UnixNano()))
  428. tx := make([]byte, txmp.config.MaxTxBytes-1)
  429. _, err := rng.Read(tx)
  430. require.NoError(t, err)
  431. callback := func(res *abci.Response) {
  432. checkTxRes, ok := res.Value.(*abci.Response_CheckTx)
  433. require.True(t, ok)
  434. expectedErrString := ""
  435. if testCase.err != nil {
  436. expectedErrString = testCase.err.Error()
  437. }
  438. require.Equal(t, expectedErrString, checkTxRes.CheckTx.MempoolError)
  439. }
  440. require.NoError(t, txmp.CheckTx(context.Background(), tx, callback, TxInfo{SenderID: 0}))
  441. })
  442. }
  443. }