Browse Source

mempool: reactor test

pull/644/head
Ethan Buchman 7 years ago
parent
commit
88138c38cf
5 changed files with 131 additions and 19 deletions
  1. +6
    -4
      mempool/mempool.go
  2. +8
    -10
      mempool/mempool_test.go
  3. +8
    -5
      mempool/reactor.go
  4. +108
    -0
      mempool/reactor_test.go
  5. +1
    -0
      p2p/connection.go

+ 6
- 4
mempool/mempool.go View File

@ -179,7 +179,7 @@ func (mem *Mempool) CheckTx(tx types.Tx, cb func(*abci.Response)) (err error) {
},
})
}
return nil
return nil // TODO: return an error (?)
}
mem.cache.Push(tx)
// END CACHE
@ -216,21 +216,23 @@ func (mem *Mempool) resCb(req *abci.Request, res *abci.Response) {
func (mem *Mempool) resCbNormal(req *abci.Request, res *abci.Response) {
switch r := res.Value.(type) {
case *abci.Response_CheckTx:
tx := req.GetCheckTx().Tx
if r.CheckTx.Code == abci.CodeType_OK {
mem.counter++
memTx := &mempoolTx{
counter: mem.counter,
height: int64(mem.height),
tx: req.GetCheckTx().Tx,
tx: tx,
}
mem.txs.PushBack(memTx)
mem.logger.Info("Added good transaction", "tx", tx, "res", r)
mem.notifyTxsAvailable()
} else {
// ignore bad transaction
mem.logger.Info("Bad Transaction", "res", r)
mem.logger.Info("Rejected bad transaction", "tx", tx, "res", r)
// remove from cache (it might be good later)
mem.cache.Remove(req.GetCheckTx().Tx)
mem.cache.Remove(tx)
// TODO: handle other retcodes
}


+ 8
- 10
mempool/mempool_test.go View File

@ -15,14 +15,12 @@ import (
"github.com/tendermint/tendermint/types"
)
func newMempoolWithApp(t *testing.T, cc proxy.ClientCreator) *Mempool {
func newMempoolWithApp(cc proxy.ClientCreator) *Mempool {
config := cfg.ResetTestRoot("mempool_test")
appConnMem, _ := cc.NewABCIClient()
appConnMem.SetLogger(log.TestingLogger().With("module", "abci-client", "connection", "mempool"))
if _, err := appConnMem.Start(); err != nil {
t.Fatalf("Error starting ABCI client: %v", err.Error())
}
appConnMem.Start()
mempool := NewMempool(config.Mempool, appConnMem, 0)
mempool.SetLogger(log.TestingLogger())
return mempool
@ -46,7 +44,7 @@ func ensureFire(t *testing.T, ch <-chan int, timeoutMS int) {
}
}
func sendTxs(t *testing.T, mempool *Mempool, count int) types.Txs {
func checkTxs(t *testing.T, mempool *Mempool, count int) types.Txs {
txs := make(types.Txs, count)
for i := 0; i < count; i++ {
txBytes := make([]byte, 20)
@ -63,7 +61,7 @@ func sendTxs(t *testing.T, mempool *Mempool, count int) types.Txs {
func TestTxsAvailable(t *testing.T) {
app := dummy.NewDummyApplication()
cc := proxy.NewLocalClientCreator(app)
mempool := newMempoolWithApp(t, cc)
mempool := newMempoolWithApp(cc)
mempool.EnableTxsAvailable()
timeoutMS := 500
@ -72,7 +70,7 @@ func TestTxsAvailable(t *testing.T) {
ensureNoFire(t, mempool.TxsAvailable(), timeoutMS)
// send a bunch of txs, it should only fire once
txs := sendTxs(t, mempool, 100)
txs := checkTxs(t, mempool, 100)
ensureFire(t, mempool.TxsAvailable(), timeoutMS)
ensureNoFire(t, mempool.TxsAvailable(), timeoutMS)
@ -85,7 +83,7 @@ func TestTxsAvailable(t *testing.T) {
ensureNoFire(t, mempool.TxsAvailable(), timeoutMS)
// send a bunch more txs. we already fired for this height so it shouldnt fire again
moreTxs := sendTxs(t, mempool, 50)
moreTxs := checkTxs(t, mempool, 50)
ensureNoFire(t, mempool.TxsAvailable(), timeoutMS)
// now call update with all the txs. it should not fire as there are no txs left
@ -94,7 +92,7 @@ func TestTxsAvailable(t *testing.T) {
ensureNoFire(t, mempool.TxsAvailable(), timeoutMS)
// send a bunch more txs, it should only fire once
sendTxs(t, mempool, 100)
checkTxs(t, mempool, 100)
ensureFire(t, mempool.TxsAvailable(), timeoutMS)
ensureNoFire(t, mempool.TxsAvailable(), timeoutMS)
}
@ -104,7 +102,7 @@ func TestSerialReap(t *testing.T) {
app.SetOption("serial", "on")
cc := proxy.NewLocalClientCreator(app)
mempool := newMempoolWithApp(t, cc)
mempool := newMempoolWithApp(cc)
appConnCon, _ := cc.NewABCIClient()
appConnCon.SetLogger(log.TestingLogger().With("module", "abci-client", "connection", "consensus"))
if _, err := appConnCon.Start(); err != nil {


+ 8
- 5
mempool/reactor.go View File

@ -9,6 +9,7 @@ import (
abci "github.com/tendermint/abci/types"
wire "github.com/tendermint/go-wire"
"github.com/tendermint/tmlibs/clist"
"github.com/tendermint/tmlibs/log"
cfg "github.com/tendermint/tendermint/config"
"github.com/tendermint/tendermint/p2p"
@ -40,6 +41,12 @@ func NewMempoolReactor(config *cfg.MempoolConfig, mempool *Mempool) *MempoolReac
return memR
}
// SetLogger sets the Logger on the reactor and the underlying Mempool.
func (memR *MempoolReactor) SetLogger(l log.Logger) {
memR.Logger = l
memR.Mempool.SetLogger(l)
}
// GetChannels implements Reactor.
// It returns the list of channels for this reactor.
func (memR *MempoolReactor) GetChannels() []*p2p.ChannelDescriptor {
@ -76,11 +83,7 @@ func (memR *MempoolReactor) Receive(chID byte, src *p2p.Peer, msgBytes []byte) {
case *TxMessage:
err := memR.Mempool.CheckTx(msg.Tx, nil)
if err != nil {
// Bad, seen, or conflicting tx.
memR.Logger.Info("Could not add tx", "tx", msg.Tx)
return
} else {
memR.Logger.Info("Added valid tx", "tx", msg.Tx)
memR.Logger.Info("Could not check tx", "tx", msg.Tx, "err", err)
}
// broadcasting happens from go routines per peer
default:


+ 108
- 0
mempool/reactor_test.go View File

@ -0,0 +1,108 @@
package mempool
import (
"fmt"
"sync"
"testing"
"time"
"github.com/stretchr/testify/assert"
"github.com/go-kit/kit/log/term"
"github.com/tendermint/abci/example/dummy"
"github.com/tendermint/tmlibs/log"
cfg "github.com/tendermint/tendermint/config"
"github.com/tendermint/tendermint/p2p"
"github.com/tendermint/tendermint/proxy"
"github.com/tendermint/tendermint/types"
)
// mempoolLogger is a TestingLogger which uses a different
// color for each validator ("validator" key must exist).
func mempoolLogger() log.Logger {
return log.TestingLoggerWithColorFn(func(keyvals ...interface{}) term.FgBgColor {
for i := 0; i < len(keyvals)-1; i += 2 {
if keyvals[i] == "validator" {
return term.FgBgColor{Fg: term.Color(uint8(keyvals[i+1].(int) + 1))}
}
}
return term.FgBgColor{}
})
}
// connect N mempool reactors through N switches
func makeAndConnectMempoolReactors(config *cfg.Config, N int) []*MempoolReactor {
reactors := make([]*MempoolReactor, N)
logger := mempoolLogger()
for i := 0; i < N; i++ {
app := dummy.NewDummyApplication()
cc := proxy.NewLocalClientCreator(app)
mempool := newMempoolWithApp(cc)
reactors[i] = NewMempoolReactor(config.Mempool, mempool) // so we dont start the consensus states
reactors[i].SetLogger(logger.With("validator", i))
}
p2p.MakeConnectedSwitches(config.P2P, N, func(i int, s *p2p.Switch) *p2p.Switch {
s.AddReactor("MEMPOOL", reactors[i])
return s
}, p2p.Connect2Switches)
return reactors
}
// wait for all txs on all reactors
func waitForTxs(t *testing.T, txs types.Txs, reactors []*MempoolReactor) {
// wait for the txs in all mempools
wg := new(sync.WaitGroup)
for i := 0; i < len(reactors); i++ {
wg.Add(1)
go _waitForTxs(t, wg, txs, i, reactors)
}
done := make(chan struct{})
go func() {
wg.Wait()
close(done)
}()
timer := time.After(TIMEOUT)
select {
case <-timer:
t.Fatal("Timed out waiting for txs")
case <-done:
}
}
// wait for all txs on a single mempool
func _waitForTxs(t *testing.T, wg *sync.WaitGroup, txs types.Txs, reactorIdx int, reactors []*MempoolReactor) {
mempool := reactors[reactorIdx].Mempool
for mempool.Size() != len(txs) {
time.Sleep(time.Second)
}
reapedTxs := mempool.Reap(len(txs))
for i, tx := range txs {
assert.Equal(t, tx, reapedTxs[i], fmt.Sprintf("txs at index %d on reactor %d don't match: %v vs %v", i, reactorIdx, tx, reapedTxs[i]))
}
wg.Done()
}
var (
NUM_TXS = 1000
TIMEOUT = 120 * time.Second // ridiculously high because CircleCI is slow
)
func TestReactorBroadcastTxMessage(t *testing.T) {
config := cfg.TestConfig()
N := 4
reactors := makeAndConnectMempoolReactors(config, N)
// send a bunch of txs to the first reactor's mempool
// and wait for them all to be received in the others
txs := checkTxs(t, reactors[0].Mempool, NUM_TXS)
waitForTxs(t, txs, reactors)
}

+ 1
- 0
p2p/connection.go View File

@ -471,6 +471,7 @@ FOR_LOOP:
}
if msgBytes != nil {
c.Logger.Debug("Received bytes", "chID", pkt.ChannelID, "msgBytes", msgBytes)
// NOTE: This means the reactor.Receive runs in the same thread as the p2p recv routine
c.onReceive(pkt.ChannelID, msgBytes)
}
default:


Loading…
Cancel
Save