You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

527 lines
14 KiB

  1. package v1
  2. import (
  3. "bytes"
  4. "context"
  5. "errors"
  6. "fmt"
  7. "math/rand"
  8. "os"
  9. "sort"
  10. "strconv"
  11. "strings"
  12. "sync"
  13. "testing"
  14. "time"
  15. "github.com/stretchr/testify/require"
  16. "github.com/tendermint/tendermint/abci/example/code"
  17. "github.com/tendermint/tendermint/abci/example/kvstore"
  18. abci "github.com/tendermint/tendermint/abci/types"
  19. "github.com/tendermint/tendermint/config"
  20. "github.com/tendermint/tendermint/internal/mempool"
  21. "github.com/tendermint/tendermint/libs/log"
  22. "github.com/tendermint/tendermint/proxy"
  23. "github.com/tendermint/tendermint/types"
  24. )
  25. // application extends the KV store application by overriding CheckTx to provide
  26. // transaction priority based on the value in the key/value pair.
  27. type application struct {
  28. *kvstore.Application
  29. }
  30. type testTx struct {
  31. tx types.Tx
  32. priority int64
  33. }
  34. func (app *application) CheckTx(req abci.RequestCheckTx) abci.ResponseCheckTx {
  35. var (
  36. priority int64
  37. sender string
  38. )
  39. // infer the priority from the raw transaction value (sender=key=value)
  40. parts := bytes.Split(req.Tx, []byte("="))
  41. if len(parts) == 3 {
  42. v, err := strconv.ParseInt(string(parts[2]), 10, 64)
  43. if err != nil {
  44. return abci.ResponseCheckTx{
  45. Priority: priority,
  46. Code: 100,
  47. GasWanted: 1,
  48. }
  49. }
  50. priority = v
  51. sender = string(parts[0])
  52. } else {
  53. return abci.ResponseCheckTx{
  54. Priority: priority,
  55. Code: 101,
  56. GasWanted: 1,
  57. }
  58. }
  59. return abci.ResponseCheckTx{
  60. Priority: priority,
  61. Sender: sender,
  62. Code: code.CodeTypeOK,
  63. GasWanted: 1,
  64. }
  65. }
  66. func setup(t testing.TB, cacheSize int, options ...TxMempoolOption) *TxMempool {
  67. t.Helper()
  68. app := &application{kvstore.NewApplication()}
  69. cc := proxy.NewLocalClientCreator(app)
  70. cfg := config.ResetTestRoot(strings.ReplaceAll(t.Name(), "/", "|"))
  71. cfg.Mempool.CacheSize = cacheSize
  72. appConnMem, err := cc.NewABCIClient()
  73. require.NoError(t, err)
  74. require.NoError(t, appConnMem.Start())
  75. t.Cleanup(func() {
  76. os.RemoveAll(cfg.RootDir)
  77. require.NoError(t, appConnMem.Stop())
  78. })
  79. return NewTxMempool(log.TestingLogger().With("test", t.Name()), cfg.Mempool, appConnMem, 0, options...)
  80. }
  81. func checkTxs(t *testing.T, txmp *TxMempool, numTxs int, peerID uint16) []testTx {
  82. txs := make([]testTx, numTxs)
  83. txInfo := mempool.TxInfo{SenderID: peerID}
  84. rng := rand.New(rand.NewSource(time.Now().UnixNano()))
  85. for i := 0; i < numTxs; i++ {
  86. prefix := make([]byte, 20)
  87. _, err := rng.Read(prefix)
  88. require.NoError(t, err)
  89. priority := int64(rng.Intn(9999-1000) + 1000)
  90. txs[i] = testTx{
  91. tx: []byte(fmt.Sprintf("sender-%d-%d=%X=%d", i, peerID, prefix, priority)),
  92. priority: priority,
  93. }
  94. require.NoError(t, txmp.CheckTx(context.Background(), txs[i].tx, nil, txInfo))
  95. }
  96. return txs
  97. }
  98. func TestTxMempool_TxsAvailable(t *testing.T) {
  99. txmp := setup(t, 0)
  100. txmp.EnableTxsAvailable()
  101. ensureNoTxFire := func() {
  102. timer := time.NewTimer(500 * time.Millisecond)
  103. select {
  104. case <-txmp.TxsAvailable():
  105. require.Fail(t, "unexpected transactions event")
  106. case <-timer.C:
  107. }
  108. }
  109. ensureTxFire := func() {
  110. timer := time.NewTimer(500 * time.Millisecond)
  111. select {
  112. case <-txmp.TxsAvailable():
  113. case <-timer.C:
  114. require.Fail(t, "expected transactions event")
  115. }
  116. }
  117. // ensure no event as we have not executed any transactions yet
  118. ensureNoTxFire()
  119. // Execute CheckTx for some transactions and ensure TxsAvailable only fires
  120. // once.
  121. txs := checkTxs(t, txmp, 100, 0)
  122. ensureTxFire()
  123. ensureNoTxFire()
  124. rawTxs := make([]types.Tx, len(txs))
  125. for i, tx := range txs {
  126. rawTxs[i] = tx.tx
  127. }
  128. responses := make([]*abci.ResponseDeliverTx, len(rawTxs[:50]))
  129. for i := 0; i < len(responses); i++ {
  130. responses[i] = &abci.ResponseDeliverTx{Code: abci.CodeTypeOK}
  131. }
  132. // commit half the transactions and ensure we fire an event
  133. txmp.Lock()
  134. require.NoError(t, txmp.Update(1, rawTxs[:50], responses, nil, nil))
  135. txmp.Unlock()
  136. ensureTxFire()
  137. ensureNoTxFire()
  138. // Execute CheckTx for more transactions and ensure we do not fire another
  139. // event as we're still on the same height (1).
  140. _ = checkTxs(t, txmp, 100, 0)
  141. ensureNoTxFire()
  142. }
  143. func TestTxMempool_Size(t *testing.T) {
  144. txmp := setup(t, 0)
  145. txs := checkTxs(t, txmp, 100, 0)
  146. require.Equal(t, len(txs), txmp.Size())
  147. require.Equal(t, int64(5690), txmp.SizeBytes())
  148. rawTxs := make([]types.Tx, len(txs))
  149. for i, tx := range txs {
  150. rawTxs[i] = tx.tx
  151. }
  152. responses := make([]*abci.ResponseDeliverTx, len(rawTxs[:50]))
  153. for i := 0; i < len(responses); i++ {
  154. responses[i] = &abci.ResponseDeliverTx{Code: abci.CodeTypeOK}
  155. }
  156. txmp.Lock()
  157. require.NoError(t, txmp.Update(1, rawTxs[:50], responses, nil, nil))
  158. txmp.Unlock()
  159. require.Equal(t, len(rawTxs)/2, txmp.Size())
  160. require.Equal(t, int64(2850), txmp.SizeBytes())
  161. }
  162. func TestTxMempool_Flush(t *testing.T) {
  163. txmp := setup(t, 0)
  164. txs := checkTxs(t, txmp, 100, 0)
  165. require.Equal(t, len(txs), txmp.Size())
  166. require.Equal(t, int64(5690), txmp.SizeBytes())
  167. rawTxs := make([]types.Tx, len(txs))
  168. for i, tx := range txs {
  169. rawTxs[i] = tx.tx
  170. }
  171. responses := make([]*abci.ResponseDeliverTx, len(rawTxs[:50]))
  172. for i := 0; i < len(responses); i++ {
  173. responses[i] = &abci.ResponseDeliverTx{Code: abci.CodeTypeOK}
  174. }
  175. txmp.Lock()
  176. require.NoError(t, txmp.Update(1, rawTxs[:50], responses, nil, nil))
  177. txmp.Unlock()
  178. txmp.Flush()
  179. require.Zero(t, txmp.Size())
  180. require.Equal(t, int64(0), txmp.SizeBytes())
  181. }
  182. func TestTxMempool_ReapMaxBytesMaxGas(t *testing.T) {
  183. txmp := setup(t, 0)
  184. tTxs := checkTxs(t, txmp, 100, 0) // all txs request 1 gas unit
  185. require.Equal(t, len(tTxs), txmp.Size())
  186. require.Equal(t, int64(5690), txmp.SizeBytes())
  187. txMap := make(map[[mempool.TxKeySize]byte]testTx)
  188. priorities := make([]int64, len(tTxs))
  189. for i, tTx := range tTxs {
  190. txMap[mempool.TxKey(tTx.tx)] = tTx
  191. priorities[i] = tTx.priority
  192. }
  193. sort.Slice(priorities, func(i, j int) bool {
  194. // sort by priority, i.e. decreasing order
  195. return priorities[i] > priorities[j]
  196. })
  197. ensurePrioritized := func(reapedTxs types.Txs) {
  198. reapedPriorities := make([]int64, len(reapedTxs))
  199. for i, rTx := range reapedTxs {
  200. reapedPriorities[i] = txMap[mempool.TxKey(rTx)].priority
  201. }
  202. require.Equal(t, priorities[:len(reapedPriorities)], reapedPriorities)
  203. }
  204. // reap by gas capacity only
  205. reapedTxs := txmp.ReapMaxBytesMaxGas(-1, 50)
  206. ensurePrioritized(reapedTxs)
  207. require.Equal(t, len(tTxs), txmp.Size())
  208. require.Equal(t, int64(5690), txmp.SizeBytes())
  209. require.Len(t, reapedTxs, 50)
  210. // reap by transaction bytes only
  211. reapedTxs = txmp.ReapMaxBytesMaxGas(1000, -1)
  212. ensurePrioritized(reapedTxs)
  213. require.Equal(t, len(tTxs), txmp.Size())
  214. require.Equal(t, int64(5690), txmp.SizeBytes())
  215. require.GreaterOrEqual(t, len(reapedTxs), 16)
  216. // Reap by both transaction bytes and gas, where the size yields 31 reaped
  217. // transactions and the gas limit reaps 25 transactions.
  218. reapedTxs = txmp.ReapMaxBytesMaxGas(1500, 30)
  219. ensurePrioritized(reapedTxs)
  220. require.Equal(t, len(tTxs), txmp.Size())
  221. require.Equal(t, int64(5690), txmp.SizeBytes())
  222. require.Len(t, reapedTxs, 25)
  223. }
  224. func TestTxMempool_ReapMaxTxs(t *testing.T) {
  225. txmp := setup(t, 0)
  226. tTxs := checkTxs(t, txmp, 100, 0)
  227. require.Equal(t, len(tTxs), txmp.Size())
  228. require.Equal(t, int64(5690), txmp.SizeBytes())
  229. txMap := make(map[[mempool.TxKeySize]byte]testTx)
  230. priorities := make([]int64, len(tTxs))
  231. for i, tTx := range tTxs {
  232. txMap[mempool.TxKey(tTx.tx)] = tTx
  233. priorities[i] = tTx.priority
  234. }
  235. sort.Slice(priorities, func(i, j int) bool {
  236. // sort by priority, i.e. decreasing order
  237. return priorities[i] > priorities[j]
  238. })
  239. ensurePrioritized := func(reapedTxs types.Txs) {
  240. reapedPriorities := make([]int64, len(reapedTxs))
  241. for i, rTx := range reapedTxs {
  242. reapedPriorities[i] = txMap[mempool.TxKey(rTx)].priority
  243. }
  244. require.Equal(t, priorities[:len(reapedPriorities)], reapedPriorities)
  245. }
  246. // reap all transactions
  247. reapedTxs := txmp.ReapMaxTxs(-1)
  248. ensurePrioritized(reapedTxs)
  249. require.Equal(t, len(tTxs), txmp.Size())
  250. require.Equal(t, int64(5690), txmp.SizeBytes())
  251. require.Len(t, reapedTxs, len(tTxs))
  252. // reap a single transaction
  253. reapedTxs = txmp.ReapMaxTxs(1)
  254. ensurePrioritized(reapedTxs)
  255. require.Equal(t, len(tTxs), txmp.Size())
  256. require.Equal(t, int64(5690), txmp.SizeBytes())
  257. require.Len(t, reapedTxs, 1)
  258. // reap half of the transactions
  259. reapedTxs = txmp.ReapMaxTxs(len(tTxs) / 2)
  260. ensurePrioritized(reapedTxs)
  261. require.Equal(t, len(tTxs), txmp.Size())
  262. require.Equal(t, int64(5690), txmp.SizeBytes())
  263. require.Len(t, reapedTxs, len(tTxs)/2)
  264. }
  265. func TestTxMempool_CheckTxExceedsMaxSize(t *testing.T) {
  266. txmp := setup(t, 0)
  267. rng := rand.New(rand.NewSource(time.Now().UnixNano()))
  268. tx := make([]byte, txmp.config.MaxTxBytes+1)
  269. _, err := rng.Read(tx)
  270. require.NoError(t, err)
  271. require.Error(t, txmp.CheckTx(context.Background(), tx, nil, mempool.TxInfo{SenderID: 0}))
  272. tx = make([]byte, txmp.config.MaxTxBytes-1)
  273. _, err = rng.Read(tx)
  274. require.NoError(t, err)
  275. require.NoError(t, txmp.CheckTx(context.Background(), tx, nil, mempool.TxInfo{SenderID: 0}))
  276. }
  277. func TestTxMempool_CheckTxSamePeer(t *testing.T) {
  278. txmp := setup(t, 100)
  279. peerID := uint16(1)
  280. rng := rand.New(rand.NewSource(time.Now().UnixNano()))
  281. prefix := make([]byte, 20)
  282. _, err := rng.Read(prefix)
  283. require.NoError(t, err)
  284. tx := []byte(fmt.Sprintf("sender-0=%X=%d", prefix, 50))
  285. require.NoError(t, txmp.CheckTx(context.Background(), tx, nil, mempool.TxInfo{SenderID: peerID}))
  286. require.Error(t, txmp.CheckTx(context.Background(), tx, nil, mempool.TxInfo{SenderID: peerID}))
  287. }
  288. func TestTxMempool_CheckTxSameSender(t *testing.T) {
  289. txmp := setup(t, 100)
  290. peerID := uint16(1)
  291. rng := rand.New(rand.NewSource(time.Now().UnixNano()))
  292. prefix1 := make([]byte, 20)
  293. _, err := rng.Read(prefix1)
  294. require.NoError(t, err)
  295. prefix2 := make([]byte, 20)
  296. _, err = rng.Read(prefix2)
  297. require.NoError(t, err)
  298. tx1 := []byte(fmt.Sprintf("sender-0=%X=%d", prefix1, 50))
  299. tx2 := []byte(fmt.Sprintf("sender-0=%X=%d", prefix2, 50))
  300. require.NoError(t, txmp.CheckTx(context.Background(), tx1, nil, mempool.TxInfo{SenderID: peerID}))
  301. require.Equal(t, 1, txmp.Size())
  302. require.NoError(t, txmp.CheckTx(context.Background(), tx2, nil, mempool.TxInfo{SenderID: peerID}))
  303. require.Equal(t, 1, txmp.Size())
  304. }
  305. func TestTxMempool_ConcurrentTxs(t *testing.T) {
  306. txmp := setup(t, 100)
  307. rng := rand.New(rand.NewSource(time.Now().UnixNano()))
  308. checkTxDone := make(chan struct{})
  309. var wg sync.WaitGroup
  310. wg.Add(1)
  311. go func() {
  312. for i := 0; i < 20; i++ {
  313. _ = checkTxs(t, txmp, 100, 0)
  314. dur := rng.Intn(1000-500) + 500
  315. time.Sleep(time.Duration(dur) * time.Millisecond)
  316. }
  317. wg.Done()
  318. close(checkTxDone)
  319. }()
  320. wg.Add(1)
  321. go func() {
  322. ticker := time.NewTicker(time.Second)
  323. defer ticker.Stop()
  324. defer wg.Done()
  325. var height int64 = 1
  326. for range ticker.C {
  327. reapedTxs := txmp.ReapMaxTxs(200)
  328. if len(reapedTxs) > 0 {
  329. responses := make([]*abci.ResponseDeliverTx, len(reapedTxs))
  330. for i := 0; i < len(responses); i++ {
  331. var code uint32
  332. if i%10 == 0 {
  333. code = 100
  334. } else {
  335. code = abci.CodeTypeOK
  336. }
  337. responses[i] = &abci.ResponseDeliverTx{Code: code}
  338. }
  339. txmp.Lock()
  340. require.NoError(t, txmp.Update(height, reapedTxs, responses, nil, nil))
  341. txmp.Unlock()
  342. height++
  343. } else {
  344. // only return once we know we finished the CheckTx loop
  345. select {
  346. case <-checkTxDone:
  347. return
  348. default:
  349. }
  350. }
  351. }
  352. }()
  353. wg.Wait()
  354. require.Zero(t, txmp.Size())
  355. require.Zero(t, txmp.SizeBytes())
  356. }
  357. func TestTxMempool_ExpiredTxs_NumBlocks(t *testing.T) {
  358. txmp := setup(t, 500)
  359. txmp.height = 100
  360. txmp.config.TTLNumBlocks = 10
  361. tTxs := checkTxs(t, txmp, 100, 0)
  362. require.Equal(t, len(tTxs), txmp.Size())
  363. require.Equal(t, 100, txmp.heightIndex.Size())
  364. // reap 5 txs at the next height -- no txs should expire
  365. reapedTxs := txmp.ReapMaxTxs(5)
  366. responses := make([]*abci.ResponseDeliverTx, len(reapedTxs))
  367. for i := 0; i < len(responses); i++ {
  368. responses[i] = &abci.ResponseDeliverTx{Code: abci.CodeTypeOK}
  369. }
  370. txmp.Lock()
  371. require.NoError(t, txmp.Update(txmp.height+1, reapedTxs, responses, nil, nil))
  372. txmp.Unlock()
  373. require.Equal(t, 95, txmp.Size())
  374. require.Equal(t, 95, txmp.heightIndex.Size())
  375. // check more txs at height 101
  376. _ = checkTxs(t, txmp, 50, 1)
  377. require.Equal(t, 145, txmp.Size())
  378. require.Equal(t, 145, txmp.heightIndex.Size())
  379. // Reap 5 txs at a height that would expire all the transactions from before
  380. // the previous Update (height 100).
  381. //
  382. // NOTE: When we reap txs below, we do not know if we're picking txs from the
  383. // initial CheckTx calls or from the second round of CheckTx calls. Thus, we
  384. // cannot guarantee that all 95 txs are remaining that should be expired and
  385. // removed. However, we do know that that at most 95 txs can be expired and
  386. // removed.
  387. reapedTxs = txmp.ReapMaxTxs(5)
  388. responses = make([]*abci.ResponseDeliverTx, len(reapedTxs))
  389. for i := 0; i < len(responses); i++ {
  390. responses[i] = &abci.ResponseDeliverTx{Code: abci.CodeTypeOK}
  391. }
  392. txmp.Lock()
  393. require.NoError(t, txmp.Update(txmp.height+10, reapedTxs, responses, nil, nil))
  394. txmp.Unlock()
  395. require.GreaterOrEqual(t, txmp.Size(), 45)
  396. require.GreaterOrEqual(t, txmp.heightIndex.Size(), 45)
  397. }
  398. func TestTxMempool_CheckTxPostCheckError(t *testing.T) {
  399. cases := []struct {
  400. name string
  401. err error
  402. }{
  403. {
  404. name: "error",
  405. err: errors.New("test error"),
  406. },
  407. {
  408. name: "no error",
  409. err: nil,
  410. },
  411. }
  412. for _, tc := range cases {
  413. testCase := tc
  414. t.Run(testCase.name, func(t *testing.T) {
  415. postCheckFn := func(_ types.Tx, _ *abci.ResponseCheckTx) error {
  416. return testCase.err
  417. }
  418. txmp := setup(t, 0, WithPostCheck(postCheckFn))
  419. rng := rand.New(rand.NewSource(time.Now().UnixNano()))
  420. tx := make([]byte, txmp.config.MaxTxBytes-1)
  421. _, err := rng.Read(tx)
  422. require.NoError(t, err)
  423. callback := func(res *abci.Response) {
  424. checkTxRes, ok := res.Value.(*abci.Response_CheckTx)
  425. require.True(t, ok)
  426. expectedErrString := ""
  427. if testCase.err != nil {
  428. expectedErrString = testCase.err.Error()
  429. }
  430. require.Equal(t, expectedErrString, checkTxRes.CheckTx.MempoolError)
  431. }
  432. require.NoError(t, txmp.CheckTx(context.Background(), tx, callback, mempool.TxInfo{SenderID: 0}))
  433. })
  434. }
  435. }