@ -0,0 +1,202 @@ | |||
package evpool | |||
import ( | |||
"container/list" | |||
"fmt" | |||
"sync" | |||
"sync/atomic" | |||
"github.com/tendermint/tmlibs/log" | |||
cfg "github.com/tendermint/tendermint/config" | |||
"github.com/tendermint/tendermint/types" | |||
) | |||
const cacheSize = 100000 | |||
// EvidencePool maintains a set of valid uncommitted evidence. | |||
type EvidencePool struct { | |||
config *cfg.EvidencePoolConfig | |||
mtx sync.Mutex | |||
height int // the last block Update()'d to | |||
evidence types.Evidences | |||
// TODO: evidenceCache | |||
// TODO: need to persist evidence so we never lose it | |||
logger log.Logger | |||
} | |||
func NewEvidencePool(config *cfg.EvidencePoolConfig, height int) *EvidencePool { | |||
evpool := &EvidencePool{ | |||
config: config, | |||
height: height, | |||
logger: log.NewNopLogger(), | |||
} | |||
evpool.initWAL() | |||
return evpool | |||
} | |||
// SetLogger sets the Logger. | |||
func (evpool *EvidencePool) SetLogger(l log.Logger) { | |||
evpool.logger = l | |||
} | |||
// Evidence returns a copy of the pool's evidence. | |||
func (evpool *EvidencePool) Evidence() types.Evidences { | |||
evpool.mtx.Lock() | |||
defer evpool.mtx.Unlock() | |||
evCopy := make(types.Evidences, len(evpool.evidence)) | |||
for i, ev := range evpool.evidence { | |||
evCopy[i] = ev | |||
} | |||
return evCopy | |||
} | |||
// Size returns the number of pieces of evidence in the evpool. | |||
func (evpool *EvidencePool) Size() int { | |||
evpool.mtx.Lock() | |||
defer evpool.mtx.Unlock() | |||
return len(evpool.evidence) | |||
} | |||
// Flush removes all evidence from the evpool | |||
func (evpool *EvidencePool) Flush() { | |||
evpool.mtx.Lock() | |||
defer evpool.mtx.Unlock() | |||
evpool.evidence = make(types.Evidence) | |||
} | |||
// AddEvidence checks the evidence is valid and adds it to the pool. | |||
func (evpool *EvidencePool) AddEvidence(evidence types.Evidence) (err error) { | |||
evpool.mtx.Lock() | |||
defer evpool.mtx.Unlock() | |||
if evpool.evidence.Has(evidence) { | |||
return fmt.Errorf("Evidence already exists", "evidence", evidence) | |||
} | |||
cs.Logger.Info("Found conflicting vote. Recording evidence", "evidence", ev) | |||
evpool.evidence = append(evpool.evidence, ev) | |||
// TODO: write to disk ? WAL ? | |||
return nil | |||
} | |||
// Update informs the evpool that the given evidence was committed and can be discarded. | |||
// NOTE: this should be called *after* block is committed by consensus. | |||
func (evpool *EvidencePool) Update(height int, evidence types.Evidences) { | |||
// First, create a lookup map of txns in new txs. | |||
evMap := make(map[string]struct{}) | |||
for _, ev := range evidence { | |||
evMap[string(evidence.Hash())] = struct{}{} | |||
} | |||
// Set height | |||
evpool.height = height | |||
// Remove evidence that is already committed . | |||
goodEvidence := evpool.filterEvidence(evMap) | |||
_ = goodEvidence | |||
} | |||
// TODO: | |||
func (evpool *EvidencePool) filterTxs(blockTxsMap map[string]struct{}) []types.Tx { | |||
goodTxs := make([]types.Tx, 0, evpool.txs.Len()) | |||
for e := evpool.txs.Front(); e != nil; e = e.Next() { | |||
memTx := e.Value.(*evpoolTx) | |||
// Remove the tx if it's alredy in a block. | |||
if _, ok := blockTxsMap[string(memTx.tx)]; ok { | |||
// remove from clist | |||
evpool.txs.Remove(e) | |||
e.DetachPrev() | |||
// NOTE: we don't remove committed txs from the cache. | |||
continue | |||
} | |||
// Good tx! | |||
goodTxs = append(goodTxs, memTx.tx) | |||
} | |||
return goodTxs | |||
} | |||
//-------------------------------------------------------------------------------- | |||
// evpoolTx is a transaction that successfully ran | |||
type evpoolEvidence struct { | |||
counter int64 // a simple incrementing counter | |||
height int64 // height that this tx had been validated in | |||
evidence types.Evidence // | |||
} | |||
// Height returns the height for this transaction | |||
func (memTx *evpoolTx) Height() int { | |||
return int(atomic.LoadInt64(&memTx.height)) | |||
} | |||
//-------------------------------------------------------------------------------- | |||
// TODO: | |||
// txCache maintains a cache of evidence | |||
type txCache struct { | |||
mtx sync.Mutex | |||
size int | |||
map_ map[string]struct{} | |||
list *list.List // to remove oldest tx when cache gets too big | |||
} | |||
// newTxCache returns a new txCache. | |||
func newTxCache(cacheSize int) *txCache { | |||
return &txCache{ | |||
size: cacheSize, | |||
map_: make(map[string]struct{}, cacheSize), | |||
list: list.New(), | |||
} | |||
} | |||
// Reset resets the txCache to empty. | |||
func (cache *txCache) Reset() { | |||
cache.mtx.Lock() | |||
cache.map_ = make(map[string]struct{}, cacheSize) | |||
cache.list.Init() | |||
cache.mtx.Unlock() | |||
} | |||
// Exists returns true if the given tx is cached. | |||
func (cache *txCache) Exists(tx types.Tx) bool { | |||
cache.mtx.Lock() | |||
_, exists := cache.map_[string(tx)] | |||
cache.mtx.Unlock() | |||
return exists | |||
} | |||
// Push adds the given tx to the txCache. It returns false if tx is already in the cache. | |||
func (cache *txCache) Push(tx types.Tx) bool { | |||
cache.mtx.Lock() | |||
defer cache.mtx.Unlock() | |||
if _, exists := cache.map_[string(tx)]; exists { | |||
return false | |||
} | |||
if cache.list.Len() >= cache.size { | |||
popped := cache.list.Front() | |||
poppedTx := popped.Value.(types.Tx) | |||
// NOTE: the tx may have already been removed from the map | |||
// but deleting a non-existent element is fine | |||
delete(cache.map_, string(poppedTx)) | |||
cache.list.Remove(popped) | |||
} | |||
cache.map_[string(tx)] = struct{}{} | |||
cache.list.PushBack(tx) | |||
return true | |||
} | |||
// Remove removes the given tx from the cache. | |||
func (cache *txCache) Remove(tx types.Tx) { | |||
cache.mtx.Lock() | |||
delete(cache.map_, string(tx)) | |||
cache.mtx.Unlock() | |||
} |
@ -0,0 +1,202 @@ | |||
package mempool | |||
import ( | |||
"crypto/rand" | |||
"encoding/binary" | |||
"testing" | |||
"time" | |||
"github.com/tendermint/abci/example/counter" | |||
"github.com/tendermint/abci/example/dummy" | |||
"github.com/tendermint/tmlibs/log" | |||
cfg "github.com/tendermint/tendermint/config" | |||
"github.com/tendermint/tendermint/proxy" | |||
"github.com/tendermint/tendermint/types" | |||
) | |||
func newMempoolWithApp(cc proxy.ClientCreator) *Mempool { | |||
config := cfg.ResetTestRoot("mempool_test") | |||
appConnMem, _ := cc.NewABCIClient() | |||
appConnMem.SetLogger(log.TestingLogger().With("module", "abci-client", "connection", "mempool")) | |||
appConnMem.Start() | |||
mempool := NewMempool(config.Mempool, appConnMem, 0) | |||
mempool.SetLogger(log.TestingLogger()) | |||
return mempool | |||
} | |||
func ensureNoFire(t *testing.T, ch <-chan int, timeoutMS int) { | |||
timer := time.NewTimer(time.Duration(timeoutMS) * time.Millisecond) | |||
select { | |||
case <-ch: | |||
t.Fatal("Expected not to fire") | |||
case <-timer.C: | |||
} | |||
} | |||
func ensureFire(t *testing.T, ch <-chan int, timeoutMS int) { | |||
timer := time.NewTimer(time.Duration(timeoutMS) * time.Millisecond) | |||
select { | |||
case <-ch: | |||
case <-timer.C: | |||
t.Fatal("Expected to fire") | |||
} | |||
} | |||
func checkTxs(t *testing.T, mempool *Mempool, count int) types.Txs { | |||
txs := make(types.Txs, count) | |||
for i := 0; i < count; i++ { | |||
txBytes := make([]byte, 20) | |||
txs[i] = txBytes | |||
rand.Read(txBytes) | |||
err := mempool.CheckTx(txBytes, nil) | |||
if err != nil { | |||
t.Fatal("Error after CheckTx: %v", err) | |||
} | |||
} | |||
return txs | |||
} | |||
func TestTxsAvailable(t *testing.T) { | |||
app := dummy.NewDummyApplication() | |||
cc := proxy.NewLocalClientCreator(app) | |||
mempool := newMempoolWithApp(cc) | |||
mempool.EnableTxsAvailable() | |||
timeoutMS := 500 | |||
// with no txs, it shouldnt fire | |||
ensureNoFire(t, mempool.TxsAvailable(), timeoutMS) | |||
// send a bunch of txs, it should only fire once | |||
txs := checkTxs(t, mempool, 100) | |||
ensureFire(t, mempool.TxsAvailable(), timeoutMS) | |||
ensureNoFire(t, mempool.TxsAvailable(), timeoutMS) | |||
// call update with half the txs. | |||
// it should fire once now for the new height | |||
// since there are still txs left | |||
committedTxs, txs := txs[:50], txs[50:] | |||
mempool.Update(1, committedTxs) | |||
ensureFire(t, mempool.TxsAvailable(), timeoutMS) | |||
ensureNoFire(t, mempool.TxsAvailable(), timeoutMS) | |||
// send a bunch more txs. we already fired for this height so it shouldnt fire again | |||
moreTxs := checkTxs(t, mempool, 50) | |||
ensureNoFire(t, mempool.TxsAvailable(), timeoutMS) | |||
// now call update with all the txs. it should not fire as there are no txs left | |||
committedTxs = append(txs, moreTxs...) | |||
mempool.Update(2, committedTxs) | |||
ensureNoFire(t, mempool.TxsAvailable(), timeoutMS) | |||
// send a bunch more txs, it should only fire once | |||
checkTxs(t, mempool, 100) | |||
ensureFire(t, mempool.TxsAvailable(), timeoutMS) | |||
ensureNoFire(t, mempool.TxsAvailable(), timeoutMS) | |||
} | |||
func TestSerialReap(t *testing.T) { | |||
app := counter.NewCounterApplication(true) | |||
app.SetOption("serial", "on") | |||
cc := proxy.NewLocalClientCreator(app) | |||
mempool := newMempoolWithApp(cc) | |||
appConnCon, _ := cc.NewABCIClient() | |||
appConnCon.SetLogger(log.TestingLogger().With("module", "abci-client", "connection", "consensus")) | |||
if _, err := appConnCon.Start(); err != nil { | |||
t.Fatalf("Error starting ABCI client: %v", err.Error()) | |||
} | |||
deliverTxsRange := func(start, end int) { | |||
// Deliver some txs. | |||
for i := start; i < end; i++ { | |||
// This will succeed | |||
txBytes := make([]byte, 8) | |||
binary.BigEndian.PutUint64(txBytes, uint64(i)) | |||
err := mempool.CheckTx(txBytes, nil) | |||
if err != nil { | |||
t.Fatal("Error after CheckTx: %v", err) | |||
} | |||
// This will fail because not serial (incrementing) | |||
// However, error should still be nil. | |||
// It just won't show up on Reap(). | |||
err = mempool.CheckTx(txBytes, nil) | |||
if err != nil { | |||
t.Fatal("Error after CheckTx: %v", err) | |||
} | |||
} | |||
} | |||
reapCheck := func(exp int) { | |||
txs := mempool.Reap(-1) | |||
if len(txs) != exp { | |||
t.Fatalf("Expected to reap %v txs but got %v", exp, len(txs)) | |||
} | |||
} | |||
updateRange := func(start, end int) { | |||
txs := make([]types.Tx, 0) | |||
for i := start; i < end; i++ { | |||
txBytes := make([]byte, 8) | |||
binary.BigEndian.PutUint64(txBytes, uint64(i)) | |||
txs = append(txs, txBytes) | |||
} | |||
mempool.Update(0, txs) | |||
} | |||
commitRange := func(start, end int) { | |||
// Deliver some txs. | |||
for i := start; i < end; i++ { | |||
txBytes := make([]byte, 8) | |||
binary.BigEndian.PutUint64(txBytes, uint64(i)) | |||
res := appConnCon.DeliverTxSync(txBytes) | |||
if !res.IsOK() { | |||
t.Errorf("Error committing tx. Code:%v result:%X log:%v", | |||
res.Code, res.Data, res.Log) | |||
} | |||
} | |||
res := appConnCon.CommitSync() | |||
if len(res.Data) != 8 { | |||
t.Errorf("Error committing. Hash:%X log:%v", res.Data, res.Log) | |||
} | |||
} | |||
//---------------------------------------- | |||
// Deliver some txs. | |||
deliverTxsRange(0, 100) | |||
// Reap the txs. | |||
reapCheck(100) | |||
// Reap again. We should get the same amount | |||
reapCheck(100) | |||
// Deliver 0 to 999, we should reap 900 new txs | |||
// because 100 were already counted. | |||
deliverTxsRange(0, 1000) | |||
// Reap the txs. | |||
reapCheck(1000) | |||
// Reap again. We should get the same amount | |||
reapCheck(1000) | |||
// Commit from the conensus AppConn | |||
commitRange(0, 500) | |||
updateRange(0, 500) | |||
// We should have 500 left. | |||
reapCheck(500) | |||
// Deliver 100 invalid txs and 100 valid txs | |||
deliverTxsRange(900, 1100) | |||
// We should have 600 now. | |||
reapCheck(600) | |||
} |
@ -0,0 +1,138 @@ | |||
package evpool | |||
import ( | |||
"bytes" | |||
"fmt" | |||
"reflect" | |||
wire "github.com/tendermint/go-wire" | |||
"github.com/tendermint/tmlibs/log" | |||
cfg "github.com/tendermint/tendermint/config" | |||
"github.com/tendermint/tendermint/p2p" | |||
"github.com/tendermint/tendermint/types" | |||
) | |||
const ( | |||
EvidencePoolChannel = byte(0x38) | |||
maxEvidencePoolMessageSize = 1048576 // 1MB TODO make it configurable | |||
peerCatchupSleepIntervalMS = 100 // If peer is behind, sleep this amount | |||
) | |||
// EvidencePoolReactor handles evpool evidence broadcasting amongst peers. | |||
type EvidencePoolReactor struct { | |||
p2p.BaseReactor | |||
config *cfg.EvidencePoolConfig | |||
EvidencePool *EvidencePool | |||
evsw types.EventSwitch | |||
} | |||
// NewEvidencePoolReactor returns a new EvidencePoolReactor with the given config and evpool. | |||
func NewEvidencePoolReactor(config *cfg.EvidencePoolConfig, evpool *EvidencePool) *EvidencePoolReactor { | |||
evR := &EvidencePoolReactor{ | |||
config: config, | |||
EvidencePool: evpool, | |||
} | |||
evR.BaseReactor = *p2p.NewBaseReactor("EvidencePoolReactor", evR) | |||
return evR | |||
} | |||
// SetLogger sets the Logger on the reactor and the underlying EvidencePool. | |||
func (evR *EvidencePoolReactor) SetLogger(l log.Logger) { | |||
evR.Logger = l | |||
evR.EvidencePool.SetLogger(l) | |||
} | |||
// GetChannels implements Reactor. | |||
// It returns the list of channels for this reactor. | |||
func (evR *EvidencePoolReactor) GetChannels() []*p2p.ChannelDescriptor { | |||
return []*p2p.ChannelDescriptor{ | |||
&p2p.ChannelDescriptor{ | |||
ID: EvidencePoolChannel, | |||
Priority: 5, | |||
}, | |||
} | |||
} | |||
// AddPeer implements Reactor. | |||
func (evR *EvidencePoolReactor) AddPeer(peer p2p.Peer) { | |||
// send the new peer all current evidence | |||
evidence := evR.evpool.Evidence() | |||
msg := EvidenceMessage{evidence} | |||
success := peer.Send(EvidencePoolChannel, struct{ EvidencePoolMessage }{msg}) | |||
if !success { | |||
// TODO: remove peer ? | |||
} | |||
} | |||
// RemovePeer implements Reactor. | |||
func (evR *EvidencePoolReactor) RemovePeer(peer p2p.Peer, reason interface{}) { | |||
} | |||
// Receive implements Reactor. | |||
// It adds any received evidence to the evpool. | |||
func (evR *EvidencePoolReactor) Receive(chID byte, src p2p.Peer, msgBytes []byte) { | |||
_, msg, err := DecodeMessage(msgBytes) | |||
if err != nil { | |||
evR.Logger.Error("Error decoding message", "err", err) | |||
return | |||
} | |||
evR.Logger.Debug("Receive", "src", src, "chId", chID, "msg", msg) | |||
switch msg := msg.(type) { | |||
case *EvidenceMessage: | |||
for _, ev := range msg.Evidence { | |||
err := evR.EvidencePool.AddEvidence(msg.Evidence, nil) | |||
if err != nil { | |||
evR.Logger.Info("Evidence is not valid", "evidence", msg.Evidence, "err", err) | |||
// TODO: punish peer | |||
} else { | |||
// TODO: broadcast good evidence to all peers (except sender? ) | |||
} | |||
} | |||
default: | |||
evR.Logger.Error(fmt.Sprintf("Unknown message type %v", reflect.TypeOf(msg))) | |||
} | |||
} | |||
// SetEventSwitch implements events.Eventable. | |||
func (evR *EvidencePoolReactor) SetEventSwitch(evsw types.EventSwitch) { | |||
evR.evsw = evsw | |||
} | |||
//----------------------------------------------------------------------------- | |||
// Messages | |||
const ( | |||
msgTypeEvidence = byte(0x01) | |||
) | |||
// EvidencePoolMessage is a message sent or received by the EvidencePoolReactor. | |||
type EvidencePoolMessage interface{} | |||
var _ = wire.RegisterInterface( | |||
struct{ EvidencePoolMessage }{}, | |||
wire.ConcreteType{&EvidenceMessage{}, msgTypeEvidence}, | |||
) | |||
// DecodeMessage decodes a byte-array into a EvidencePoolMessage. | |||
func DecodeMessage(bz []byte) (msgType byte, msg EvidencePoolMessage, err error) { | |||
msgType = bz[0] | |||
n := new(int) | |||
r := bytes.NewReader(bz) | |||
msg = wire.ReadBinary(struct{ EvidencePoolMessage }{}, r, maxEvidencePoolMessageSize, n, &err).(struct{ EvidencePoolMessage }).EvidencePoolMessage | |||
return | |||
} | |||
//------------------------------------- | |||
// EvidenceMessage contains a list of evidence. | |||
type EvidenceMessage struct { | |||
Evidence types.Evidences | |||
} | |||
// String returns a string representation of the EvidenceMessage. | |||
func (m *EvidenceMessage) String() string { | |||
return fmt.Sprintf("[EvidenceMessage %v]", m.Evidence) | |||
} |
@ -0,0 +1,108 @@ | |||
package evpool | |||
import ( | |||
"fmt" | |||
"sync" | |||
"testing" | |||
"time" | |||
"github.com/stretchr/testify/assert" | |||
"github.com/go-kit/kit/log/term" | |||
"github.com/tendermint/abci/example/dummy" | |||
"github.com/tendermint/tmlibs/log" | |||
cfg "github.com/tendermint/tendermint/config" | |||
"github.com/tendermint/tendermint/p2p" | |||
"github.com/tendermint/tendermint/proxy" | |||
"github.com/tendermint/tendermint/types" | |||
) | |||
// evpoolLogger is a TestingLogger which uses a different | |||
// color for each validator ("validator" key must exist). | |||
func evpoolLogger() log.Logger { | |||
return log.TestingLoggerWithColorFn(func(keyvals ...interface{}) term.FgBgColor { | |||
for i := 0; i < len(keyvals)-1; i += 2 { | |||
if keyvals[i] == "validator" { | |||
return term.FgBgColor{Fg: term.Color(uint8(keyvals[i+1].(int) + 1))} | |||
} | |||
} | |||
return term.FgBgColor{} | |||
}) | |||
} | |||
// connect N evpool reactors through N switches | |||
func makeAndConnectEvidencePoolReactors(config *cfg.Config, N int) []*EvidencePoolReactor { | |||
reactors := make([]*EvidencePoolReactor, N) | |||
logger := evpoolLogger() | |||
for i := 0; i < N; i++ { | |||
app := dummy.NewDummyApplication() | |||
cc := proxy.NewLocalClientCreator(app) | |||
evpool := newEvidencePoolWithApp(cc) | |||
reactors[i] = NewEvidencePoolReactor(config.EvidencePool, evpool) // so we dont start the consensus states | |||
reactors[i].SetLogger(logger.With("validator", i)) | |||
} | |||
p2p.MakeConnectedSwitches(config.P2P, N, func(i int, s *p2p.Switch) *p2p.Switch { | |||
s.AddReactor("MEMPOOL", reactors[i]) | |||
return s | |||
}, p2p.Connect2Switches) | |||
return reactors | |||
} | |||
// wait for all evidences on all reactors | |||
func waitForTxs(t *testing.T, evidences types.Txs, reactors []*EvidencePoolReactor) { | |||
// wait for the evidences in all evpools | |||
wg := new(sync.WaitGroup) | |||
for i := 0; i < len(reactors); i++ { | |||
wg.Add(1) | |||
go _waitForTxs(t, wg, evidences, i, reactors) | |||
} | |||
done := make(chan struct{}) | |||
go func() { | |||
wg.Wait() | |||
close(done) | |||
}() | |||
timer := time.After(TIMEOUT) | |||
select { | |||
case <-timer: | |||
t.Fatal("Timed out waiting for evidences") | |||
case <-done: | |||
} | |||
} | |||
// wait for all evidences on a single evpool | |||
func _waitForTxs(t *testing.T, wg *sync.WaitGroup, evidences types.Txs, reactorIdx int, reactors []*EvidencePoolReactor) { | |||
evpool := reactors[reactorIdx].EvidencePool | |||
for evpool.Size() != len(evidences) { | |||
time.Sleep(time.Second) | |||
} | |||
reapedTxs := evpool.Reap(len(evidences)) | |||
for i, evidence := range evidences { | |||
assert.Equal(t, evidence, reapedTxs[i], fmt.Sprintf("evidences at index %d on reactor %d don't match: %v vs %v", i, reactorIdx, evidence, reapedTxs[i])) | |||
} | |||
wg.Done() | |||
} | |||
var ( | |||
NUM_TXS = 1000 | |||
TIMEOUT = 120 * time.Second // ridiculously high because CircleCI is slow | |||
) | |||
func TestReactorBroadcastTxMessage(t *testing.T) { | |||
config := cfg.TestConfig() | |||
N := 4 | |||
reactors := makeAndConnectEvidencePoolReactors(config, N) | |||
// send a bunch of evidences to the first reactor's evpool | |||
// and wait for them all to be received in the others | |||
evidences := checkTxs(t, reactors[0].EvidencePool, NUM_TXS) | |||
waitForTxs(t, evidences, reactors) | |||
} |