Browse Source

Merge branch 'master' into wb/rebuild-synchrony-params

pull/8142/head
William Banfield 2 years ago
committed by GitHub
parent
commit
15d1836b54
No known key found for this signature in database GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 113 additions and 54 deletions
  1. +0
    -16
      internal/blocksync/pool.go
  2. +0
    -2
      internal/blocksync/reactor.go
  3. +9
    -1
      internal/mempool/mempool_bench_test.go
  4. +79
    -26
      internal/mempool/mempool_test.go
  5. +25
    -9
      internal/mempool/reactor_test.go

+ 0
- 16
internal/blocksync/pool.go View File

@ -86,7 +86,6 @@ type BlockPool struct {
requestsCh chan<- BlockRequest
errorsCh chan<- peerError
exitedCh chan struct{}
startHeight int64
lastHundredBlockTimeStamp time.Time
@ -109,7 +108,6 @@ func NewBlockPool(
height: start,
startHeight: start,
numPending: 0,
exitedCh: make(chan struct{}),
requestsCh: requestsCh,
errorsCh: errorsCh,
lastSyncRate: 0,
@ -125,11 +123,6 @@ func (pool *BlockPool) OnStart(ctx context.Context) error {
pool.lastHundredBlockTimeStamp = pool.lastAdvance
go pool.makeRequestersRoutine(ctx)
go func() {
defer close(pool.exitedCh)
pool.Wait()
}()
return nil
}
@ -637,12 +630,6 @@ func (bpr *bpRequester) redo(peerID types.NodeID) {
// Responsible for making more requests as necessary
// Returns only when a block is found (e.g. AddBlock() is called)
func (bpr *bpRequester) requestRoutine(ctx context.Context) {
bprPoolDone := make(chan struct{})
go func() {
defer close(bprPoolDone)
bpr.pool.Wait()
}()
OUTER_LOOP:
for {
// Pick a peer to send request to.
@ -670,9 +657,6 @@ OUTER_LOOP:
select {
case <-ctx.Done():
return
case <-bpr.pool.exitedCh:
bpr.Stop()
return
case peerID := <-bpr.redoCh:
if peerID == bpr.peerID {
bpr.reset()


+ 0
- 2
internal/blocksync/reactor.go View File

@ -445,8 +445,6 @@ func (r *Reactor) poolRoutine(ctx context.Context, stateSynced bool) {
select {
case <-ctx.Done():
return
case <-r.pool.exitedCh:
return
case <-switchToConsensusTicker.C:
var (
height, numPending, lenRequesters = r.pool.GetStatus()


+ 9
- 1
internal/mempool/mempool_bench_test.go View File

@ -8,15 +8,23 @@ import (
"time"
"github.com/stretchr/testify/require"
abciclient "github.com/tendermint/tendermint/abci/client"
"github.com/tendermint/tendermint/abci/example/kvstore"
"github.com/tendermint/tendermint/libs/log"
)
func BenchmarkTxMempool_CheckTx(b *testing.B) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
client := abciclient.NewLocalClient(log.NewNopLogger(), kvstore.NewApplication())
if err := client.Start(ctx); err != nil {
b.Fatal(err)
}
// setup the cache and the mempool number for hitting GetEvictableTxs during the
// benchmark. 5000 is the current default mempool size in the TM config.
txmp := setup(ctx, b, 10000)
txmp := setup(ctx, b, client, 10000)
txmp.config.Size = 5000
rng := rand.New(rand.NewSource(time.Now().UnixNano()))


+ 79
- 26
internal/mempool/mempool_test.go View File

@ -72,30 +72,18 @@ func (app *application) CheckTx(req abci.RequestCheckTx) abci.ResponseCheckTx {
}
}
func setup(ctx context.Context, t testing.TB, cacheSize int, options ...TxMempoolOption) *TxMempool {
func setup(ctx context.Context, t testing.TB, app abciclient.Client, cacheSize int, options ...TxMempoolOption) *TxMempool {
t.Helper()
var cancel context.CancelFunc
ctx, cancel = context.WithCancel(ctx)
logger := log.TestingLogger()
conn := abciclient.NewLocalClient(logger, &application{
kvstore.NewApplication(),
})
cfg, err := config.ResetTestRoot(t.TempDir(), strings.ReplaceAll(t.Name(), "/", "|"))
require.NoError(t, err)
cfg.Mempool.CacheSize = cacheSize
require.NoError(t, conn.Start(ctx))
t.Cleanup(func() {
os.RemoveAll(cfg.RootDir)
cancel()
conn.Wait()
})
t.Cleanup(func() { os.RemoveAll(cfg.RootDir) })
return NewTxMempool(logger.With("test", t.Name()), cfg.Mempool, conn, options...)
return NewTxMempool(logger.With("test", t.Name()), cfg.Mempool, app, options...)
}
func checkTxs(ctx context.Context, t *testing.T, txmp *TxMempool, numTxs int, peerID uint16) []testTx {
@ -137,7 +125,13 @@ func TestTxMempool_TxsAvailable(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
txmp := setup(ctx, t, 0)
client := abciclient.NewLocalClient(log.NewNopLogger(), &application{Application: kvstore.NewApplication()})
if err := client.Start(ctx); err != nil {
t.Fatal(err)
}
t.Cleanup(client.Wait)
txmp := setup(ctx, t, client, 0)
txmp.EnableTxsAvailable()
ensureNoTxFire := func() {
@ -194,7 +188,13 @@ func TestTxMempool_Size(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
txmp := setup(ctx, t, 0)
client := abciclient.NewLocalClient(log.NewNopLogger(), &application{Application: kvstore.NewApplication()})
if err := client.Start(ctx); err != nil {
t.Fatal(err)
}
t.Cleanup(client.Wait)
txmp := setup(ctx, t, client, 0)
txs := checkTxs(ctx, t, txmp, 100, 0)
require.Equal(t, len(txs), txmp.Size())
require.Equal(t, int64(5690), txmp.SizeBytes())
@ -221,7 +221,13 @@ func TestTxMempool_Flush(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
txmp := setup(ctx, t, 0)
client := abciclient.NewLocalClient(log.NewNopLogger(), &application{Application: kvstore.NewApplication()})
if err := client.Start(ctx); err != nil {
t.Fatal(err)
}
t.Cleanup(client.Wait)
txmp := setup(ctx, t, client, 0)
txs := checkTxs(ctx, t, txmp, 100, 0)
require.Equal(t, len(txs), txmp.Size())
require.Equal(t, int64(5690), txmp.SizeBytes())
@ -249,7 +255,13 @@ func TestTxMempool_ReapMaxBytesMaxGas(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
txmp := setup(ctx, t, 0)
client := abciclient.NewLocalClient(log.NewNopLogger(), &application{Application: kvstore.NewApplication()})
if err := client.Start(ctx); err != nil {
t.Fatal(err)
}
t.Cleanup(client.Wait)
txmp := setup(ctx, t, client, 0)
tTxs := checkTxs(ctx, t, txmp, 100, 0) // all txs request 1 gas unit
require.Equal(t, len(tTxs), txmp.Size())
require.Equal(t, int64(5690), txmp.SizeBytes())
@ -302,7 +314,13 @@ func TestTxMempool_ReapMaxTxs(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
txmp := setup(ctx, t, 0)
client := abciclient.NewLocalClient(log.NewNopLogger(), &application{Application: kvstore.NewApplication()})
if err := client.Start(ctx); err != nil {
t.Fatal(err)
}
t.Cleanup(client.Wait)
txmp := setup(ctx, t, client, 0)
tTxs := checkTxs(ctx, t, txmp, 100, 0)
require.Equal(t, len(tTxs), txmp.Size())
require.Equal(t, int64(5690), txmp.SizeBytes())
@ -354,7 +372,12 @@ func TestTxMempool_CheckTxExceedsMaxSize(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
txmp := setup(ctx, t, 0)
client := abciclient.NewLocalClient(log.NewNopLogger(), &application{Application: kvstore.NewApplication()})
if err := client.Start(ctx); err != nil {
t.Fatal(err)
}
t.Cleanup(client.Wait)
txmp := setup(ctx, t, client, 0)
rng := rand.New(rand.NewSource(time.Now().UnixNano()))
tx := make([]byte, txmp.config.MaxTxBytes+1)
@ -374,7 +397,13 @@ func TestTxMempool_CheckTxSamePeer(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
txmp := setup(ctx, t, 100)
client := abciclient.NewLocalClient(log.NewNopLogger(), &application{Application: kvstore.NewApplication()})
if err := client.Start(ctx); err != nil {
t.Fatal(err)
}
t.Cleanup(client.Wait)
txmp := setup(ctx, t, client, 100)
peerID := uint16(1)
rng := rand.New(rand.NewSource(time.Now().UnixNano()))
@ -392,7 +421,13 @@ func TestTxMempool_CheckTxSameSender(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
txmp := setup(ctx, t, 100)
client := abciclient.NewLocalClient(log.NewNopLogger(), &application{Application: kvstore.NewApplication()})
if err := client.Start(ctx); err != nil {
t.Fatal(err)
}
t.Cleanup(client.Wait)
txmp := setup(ctx, t, client, 100)
peerID := uint16(1)
rng := rand.New(rand.NewSource(time.Now().UnixNano()))
@ -417,7 +452,13 @@ func TestTxMempool_ConcurrentTxs(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
txmp := setup(ctx, t, 100)
client := abciclient.NewLocalClient(log.NewNopLogger(), &application{Application: kvstore.NewApplication()})
if err := client.Start(ctx); err != nil {
t.Fatal(err)
}
t.Cleanup(client.Wait)
txmp := setup(ctx, t, client, 100)
rng := rand.New(rand.NewSource(time.Now().UnixNano()))
checkTxDone := make(chan struct{})
@ -484,7 +525,13 @@ func TestTxMempool_ExpiredTxs_NumBlocks(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
txmp := setup(ctx, t, 500)
client := abciclient.NewLocalClient(log.NewNopLogger(), &application{Application: kvstore.NewApplication()})
if err := client.Start(ctx); err != nil {
t.Fatal(err)
}
t.Cleanup(client.Wait)
txmp := setup(ctx, t, client, 500)
txmp.height = 100
txmp.config.TTLNumBlocks = 10
@ -556,10 +603,16 @@ func TestTxMempool_CheckTxPostCheckError(t *testing.T) {
ctx, cancel := context.WithCancel(ctx)
defer cancel()
client := abciclient.NewLocalClient(log.NewNopLogger(), &application{Application: kvstore.NewApplication()})
if err := client.Start(ctx); err != nil {
t.Fatal(err)
}
t.Cleanup(client.Wait)
postCheckFn := func(_ types.Tx, _ *abci.ResponseCheckTx) error {
return testCase.err
}
txmp := setup(ctx, t, 0, WithPostCheck(postCheckFn))
txmp := setup(ctx, t, client, 0, WithPostCheck(postCheckFn))
rng := rand.New(rand.NewSource(time.Now().UnixNano()))
tx := make([]byte, txmp.config.MaxTxBytes-1)
_, err := rng.Read(tx)


+ 25
- 9
internal/mempool/reactor_test.go View File

@ -13,6 +13,7 @@ import (
"github.com/fortytw2/leaktest"
"github.com/stretchr/testify/require"
abciclient "github.com/tendermint/tendermint/abci/client"
"github.com/tendermint/tendermint/abci/example/kvstore"
abci "github.com/tendermint/tendermint/abci/types"
"github.com/tendermint/tendermint/config"
@ -39,7 +40,7 @@ type reactorTestSuite struct {
nodes []types.NodeID
}
func setupReactors(ctx context.Context, t *testing.T, numNodes int, chBuf uint) *reactorTestSuite {
func setupReactors(ctx context.Context, t *testing.T, logger log.Logger, numNodes int, chBuf uint) *reactorTestSuite {
t.Helper()
cfg, err := config.ResetTestRoot(t.TempDir(), strings.ReplaceAll(t.Name(), "/", "|"))
@ -63,7 +64,11 @@ func setupReactors(ctx context.Context, t *testing.T, numNodes int, chBuf uint)
for nodeID := range rts.network.Nodes {
rts.kvstores[nodeID] = kvstore.NewApplication()
mempool := setup(ctx, t, 0)
client := abciclient.NewLocalClient(logger, rts.kvstores[nodeID])
require.NoError(t, client.Start(ctx))
t.Cleanup(client.Wait)
mempool := setup(ctx, t, client, 0)
rts.mempools[nodeID] = mempool
rts.peerChans[nodeID] = make(chan p2p.PeerUpdate, chBuf)
@ -151,7 +156,9 @@ func TestReactorBroadcastDoesNotPanic(t *testing.T) {
defer cancel()
const numNodes = 2
rts := setupReactors(ctx, t, numNodes, 0)
logger := log.NewNopLogger()
rts := setupReactors(ctx, t, logger, numNodes, 0)
observePanic := func(r interface{}) {
t.Fatal("panic detected in reactor")
@ -194,7 +201,9 @@ func TestReactorBroadcastTxs(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
rts := setupReactors(ctx, t, numNodes, uint(numTxs))
logger := log.NewNopLogger()
rts := setupReactors(ctx, t, logger, numNodes, uint(numTxs))
primary := rts.nodes[0]
secondaries := rts.nodes[1:]
@ -218,7 +227,8 @@ func TestReactorConcurrency(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
rts := setupReactors(ctx, t, numNodes, 0)
logger := log.NewNopLogger()
rts := setupReactors(ctx, t, logger, numNodes, 0)
primary := rts.nodes[0]
secondary := rts.nodes[1]
@ -276,7 +286,8 @@ func TestReactorNoBroadcastToSender(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
rts := setupReactors(ctx, t, numNodes, uint(numTxs))
logger := log.NewNopLogger()
rts := setupReactors(ctx, t, logger, numNodes, uint(numTxs))
primary := rts.nodes[0]
secondary := rts.nodes[1]
@ -300,7 +311,9 @@ func TestReactor_MaxTxBytes(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
rts := setupReactors(ctx, t, numNodes, 0)
logger := log.NewNopLogger()
rts := setupReactors(ctx, t, logger, numNodes, 0)
primary := rts.nodes[0]
secondary := rts.nodes[1]
@ -336,7 +349,8 @@ func TestDontExhaustMaxActiveIDs(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
rts := setupReactors(ctx, t, 1, MaxActiveIDs+1)
logger := log.NewNopLogger()
rts := setupReactors(ctx, t, logger, 1, MaxActiveIDs+1)
nodeID := rts.nodes[0]
@ -388,7 +402,9 @@ func TestBroadcastTxForPeerStopsWhenPeerStops(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
rts := setupReactors(ctx, t, 2, 2)
logger := log.NewNopLogger()
rts := setupReactors(ctx, t, logger, 2, 2)
primary := rts.nodes[0]
secondary := rts.nodes[1]


Loading…
Cancel
Save