diff --git a/consensus/state.go b/consensus/state.go index b26095de7..7b7c04c42 100644 --- a/consensus/state.go +++ b/consensus/state.go @@ -17,6 +17,7 @@ import ( cfg "github.com/tendermint/tendermint/config" cstypes "github.com/tendermint/tendermint/consensus/types" + evpool "github.com/tendermint/tendermint/evidence" "github.com/tendermint/tendermint/proxy" sm "github.com/tendermint/tendermint/state" "github.com/tendermint/tendermint/types" @@ -78,7 +79,7 @@ type ConsensusState struct { proxyAppConn proxy.AppConnConsensus blockStore types.BlockStore mempool types.Mempool - evpool types.EvidencePool + evpool evpool.EvidencePool // internal state mtx sync.Mutex @@ -114,7 +115,7 @@ type ConsensusState struct { } // NewConsensusState returns a new ConsensusState. -func NewConsensusState(config *cfg.ConsensusConfig, state *sm.State, proxyAppConn proxy.AppConnConsensus, blockStore types.BlockStore, mempool types.Mempool, evpool types.Mempool) *ConsensusState { +func NewConsensusState(config *cfg.ConsensusConfig, state *sm.State, proxyAppConn proxy.AppConnConsensus, blockStore types.BlockStore, mempool types.Mempool) *ConsensusState { cs := &ConsensusState{ config: config, proxyAppConn: proxyAppConn, @@ -126,7 +127,7 @@ func NewConsensusState(config *cfg.ConsensusConfig, state *sm.State, proxyAppCon done: make(chan struct{}), doWALCatchup: true, wal: nilWAL{}, - evpool: evpool, + // evpool: evpool, } // set function defaults (may be overwritten before calling Start) cs.decideProposal = cs.defaultDecideProposal @@ -866,7 +867,7 @@ func (cs *ConsensusState) createProposalBlock() (block *types.Block, blockParts // Mempool validated transactions txs := cs.mempool.Reap(cs.config.MaxBlockSizeTxs) block, parts := cs.state.MakeBlock(cs.Height, txs, commit) - evidence := cs.evpool.Evidence() + evidence := cs.evpool.PendingEvidence() block.AddEvidence(evidence) return block, parts } diff --git a/evidence/evidence_pool.go b/evidence/evidence_pool.go index d84eb13d8..3a31c40c3 100644 --- a/evidence/evidence_pool.go +++ b/evidence/evidence_pool.go @@ -1,14 +1,8 @@ package evpool import ( - "container/list" - "fmt" - "sync" - "sync/atomic" - "github.com/tendermint/tmlibs/log" - cfg "github.com/tendermint/tendermint/config" "github.com/tendermint/tendermint/types" ) @@ -16,25 +10,23 @@ const cacheSize = 100000 // EvidencePool maintains a set of valid uncommitted evidence. type EvidencePool struct { - config *cfg.EvidencePoolConfig - - mtx sync.Mutex - height int // the last block Update()'d to - evidence types.Evidences - // TODO: evidenceCache + config *EvidencePoolConfig + logger log.Logger - // TODO: need to persist evidence so we never lose it + evidenceStore *EvidenceStore + newEvidenceChan chan types.Evidence +} - logger log.Logger +type EvidencePoolConfig struct { } -func NewEvidencePool(config *cfg.EvidencePoolConfig, height int) *EvidencePool { +func NewEvidencePool(config *EvidencePoolConfig, evidenceStore *EvidenceStore) *EvidencePool { evpool := &EvidencePool{ - config: config, - height: height, - logger: log.NewNopLogger(), + config: config, + logger: log.NewNopLogger(), + evidenceStore: evidenceStore, + newEvidenceChan: make(chan types.Evidence), } - evpool.initWAL() return evpool } @@ -43,160 +35,59 @@ func (evpool *EvidencePool) SetLogger(l log.Logger) { evpool.logger = l } -// Evidence returns a copy of the pool's evidence. -func (evpool *EvidencePool) Evidence() types.Evidences { - evpool.mtx.Lock() - defer evpool.mtx.Unlock() - - evCopy := make(types.Evidences, len(evpool.evidence)) - for i, ev := range evpool.evidence { - evCopy[i] = ev - } - return evCopy +// NewEvidenceChan returns a channel on which new evidence is sent. +func (evpool *EvidencePool) NewEvidenceChan() chan types.Evidence { + return evpool.newEvidenceChan } -// Size returns the number of pieces of evidence in the evpool. -func (evpool *EvidencePool) Size() int { - evpool.mtx.Lock() - defer evpool.mtx.Unlock() - return len(evpool.evidence) +// PriorityEvidence returns the priority evidence. +func (evpool *EvidencePool) PriorityEvidence() []types.Evidence { + // TODO + return nil } -// Flush removes all evidence from the evpool -func (evpool *EvidencePool) Flush() { - evpool.mtx.Lock() - defer evpool.mtx.Unlock() - evpool.evidence = make(types.Evidence) +// PendingEvidence returns all uncommitted evidence. +func (evpool *EvidencePool) PendingEvidence() []types.Evidence { + // TODO + return nil } // AddEvidence checks the evidence is valid and adds it to the pool. func (evpool *EvidencePool) AddEvidence(evidence types.Evidence) (err error) { - evpool.mtx.Lock() - defer evpool.mtx.Unlock() - - if evpool.evidence.Has(evidence) { - return fmt.Errorf("Evidence already exists", "evidence", evidence) + idx := 1 // TODO + added, err := evpool.evidenceStore.AddNewEvidence(idx, evidence) + if err != nil { + return err + } else if !added { + // evidence already known, just ignore + return } - cs.Logger.Info("Found conflicting vote. Recording evidence", "evidence", ev) - evpool.evidence = append(evpool.evidence, ev) - // TODO: write to disk ? WAL ? + + evpool.logger.Info("Verified new evidence of byzantine behaviour", "evidence", evidence) + + evpool.newEvidenceChan <- evidence return nil } // Update informs the evpool that the given evidence was committed and can be discarded. // NOTE: this should be called *after* block is committed by consensus. -func (evpool *EvidencePool) Update(height int, evidence types.Evidences) { +func (evpool *EvidencePool) Update(height int, evidence []types.Evidence) { + + // First, create a lookup map of new committed evidence - // First, create a lookup map of txns in new txs. evMap := make(map[string]struct{}) for _, ev := range evidence { - evMap[string(evidence.Hash())] = struct{}{} + evpool.evidenceStore.MarkEvidenceAsCommitted(ev) + evMap[string(ev.Hash())] = struct{}{} } - // Set height - evpool.height = height - // Remove evidence that is already committed . goodEvidence := evpool.filterEvidence(evMap) _ = goodEvidence } -// TODO: -func (evpool *EvidencePool) filterTxs(blockTxsMap map[string]struct{}) []types.Tx { - goodTxs := make([]types.Tx, 0, evpool.txs.Len()) - for e := evpool.txs.Front(); e != nil; e = e.Next() { - memTx := e.Value.(*evpoolTx) - // Remove the tx if it's alredy in a block. - if _, ok := blockTxsMap[string(memTx.tx)]; ok { - // remove from clist - evpool.txs.Remove(e) - e.DetachPrev() - - // NOTE: we don't remove committed txs from the cache. - continue - } - // Good tx! - goodTxs = append(goodTxs, memTx.tx) - } - return goodTxs -} - -//-------------------------------------------------------------------------------- - -// evpoolTx is a transaction that successfully ran -type evpoolEvidence struct { - counter int64 // a simple incrementing counter - height int64 // height that this tx had been validated in - evidence types.Evidence // -} - -// Height returns the height for this transaction -func (memTx *evpoolTx) Height() int { - return int(atomic.LoadInt64(&memTx.height)) -} - -//-------------------------------------------------------------------------------- -// TODO: - -// txCache maintains a cache of evidence -type txCache struct { - mtx sync.Mutex - size int - map_ map[string]struct{} - list *list.List // to remove oldest tx when cache gets too big -} - -// newTxCache returns a new txCache. -func newTxCache(cacheSize int) *txCache { - return &txCache{ - size: cacheSize, - map_: make(map[string]struct{}, cacheSize), - list: list.New(), - } -} - -// Reset resets the txCache to empty. -func (cache *txCache) Reset() { - cache.mtx.Lock() - cache.map_ = make(map[string]struct{}, cacheSize) - cache.list.Init() - cache.mtx.Unlock() -} - -// Exists returns true if the given tx is cached. -func (cache *txCache) Exists(tx types.Tx) bool { - cache.mtx.Lock() - _, exists := cache.map_[string(tx)] - cache.mtx.Unlock() - return exists -} - -// Push adds the given tx to the txCache. It returns false if tx is already in the cache. -func (cache *txCache) Push(tx types.Tx) bool { - cache.mtx.Lock() - defer cache.mtx.Unlock() - - if _, exists := cache.map_[string(tx)]; exists { - return false - } - - if cache.list.Len() >= cache.size { - popped := cache.list.Front() - poppedTx := popped.Value.(types.Tx) - // NOTE: the tx may have already been removed from the map - // but deleting a non-existent element is fine - delete(cache.map_, string(poppedTx)) - cache.list.Remove(popped) - } - cache.map_[string(tx)] = struct{}{} - cache.list.PushBack(tx) - return true -} - -// Remove removes the given tx from the cache. -func (cache *txCache) Remove(tx types.Tx) { - cache.mtx.Lock() - delete(cache.map_, string(tx)) - cache.mtx.Unlock() +func (evpool *EvidencePool) filterEvidence(blockEvidenceMap map[string]struct{}) []types.Evidence { + // TODO: + return nil } diff --git a/evidence/evidence_pool_test.go b/evidence/evidence_pool_test.go index 46401e88b..fba5941c6 100644 --- a/evidence/evidence_pool_test.go +++ b/evidence/evidence_pool_test.go @@ -1,4 +1,4 @@ -package mempool +package evpool import ( "crypto/rand" diff --git a/evidence/reactor.go b/evidence/reactor.go index 7554bb4c2..d62e88b3d 100644 --- a/evidence/reactor.go +++ b/evidence/reactor.go @@ -4,11 +4,11 @@ import ( "bytes" "fmt" "reflect" + "time" wire "github.com/tendermint/go-wire" "github.com/tendermint/tmlibs/log" - cfg "github.com/tendermint/tendermint/config" "github.com/tendermint/tendermint/p2p" "github.com/tendermint/tendermint/types" ) @@ -18,21 +18,22 @@ const ( maxEvidencePoolMessageSize = 1048576 // 1MB TODO make it configurable peerCatchupSleepIntervalMS = 100 // If peer is behind, sleep this amount + broadcastEvidenceIntervalS = 60 // broadcast uncommitted evidence this often ) // EvidencePoolReactor handles evpool evidence broadcasting amongst peers. type EvidencePoolReactor struct { p2p.BaseReactor - config *cfg.EvidencePoolConfig - EvidencePool *EvidencePool - evsw types.EventSwitch + config *EvidencePoolConfig + evpool *EvidencePool + evsw types.EventSwitch } // NewEvidencePoolReactor returns a new EvidencePoolReactor with the given config and evpool. -func NewEvidencePoolReactor(config *cfg.EvidencePoolConfig, evpool *EvidencePool) *EvidencePoolReactor { +func NewEvidencePoolReactor(config *EvidencePoolConfig, evpool *EvidencePool) *EvidencePoolReactor { evR := &EvidencePoolReactor{ - config: config, - EvidencePool: evpool, + config: config, + evpool: evpool, } evR.BaseReactor = *p2p.NewBaseReactor("EvidencePoolReactor", evR) return evR @@ -41,7 +42,16 @@ func NewEvidencePoolReactor(config *cfg.EvidencePoolConfig, evpool *EvidencePool // SetLogger sets the Logger on the reactor and the underlying EvidencePool. func (evR *EvidencePoolReactor) SetLogger(l log.Logger) { evR.Logger = l - evR.EvidencePool.SetLogger(l) + evR.evpool.SetLogger(l) +} + +// OnStart implements cmn.Service +func (evR *EvidencePoolReactor) OnStart() error { + if err := evR.BaseReactor.OnStart(); err != nil { + return err + } + go evR.broadcastRoutine() + return nil } // GetChannels implements Reactor. @@ -57,13 +67,16 @@ func (evR *EvidencePoolReactor) GetChannels() []*p2p.ChannelDescriptor { // AddPeer implements Reactor. func (evR *EvidencePoolReactor) AddPeer(peer p2p.Peer) { - // send the new peer all current evidence - evidence := evR.evpool.Evidence() + // first send the peer high-priority evidence + evidence := evR.evpool.PriorityEvidence() msg := EvidenceMessage{evidence} success := peer.Send(EvidencePoolChannel, struct{ EvidencePoolMessage }{msg}) if !success { // TODO: remove peer ? } + + // TODO: send the remaining pending evidence + // or just let the broadcastRoutine do it ? } // RemovePeer implements Reactor. @@ -83,12 +96,10 @@ func (evR *EvidencePoolReactor) Receive(chID byte, src p2p.Peer, msgBytes []byte switch msg := msg.(type) { case *EvidenceMessage: for _, ev := range msg.Evidence { - err := evR.EvidencePool.AddEvidence(msg.Evidence, nil) + err := evR.evpool.AddEvidence(ev) if err != nil { evR.Logger.Info("Evidence is not valid", "evidence", msg.Evidence, "err", err) // TODO: punish peer - } else { - // TODO: broadcast good evidence to all peers (except sender? ) } } default: @@ -101,6 +112,30 @@ func (evR *EvidencePoolReactor) SetEventSwitch(evsw types.EventSwitch) { evR.evsw = evsw } +// broadcast new evidence to all peers +func (evR *EvidencePoolReactor) broadcastRoutine() { + ticker := time.NewTicker(time.Second * broadcastEvidenceIntervalS) + for { + select { + case evidence := <-evR.evpool.NewEvidenceChan(): + // broadcast some new evidence + msg := EvidenceMessage{[]types.Evidence{evidence}} + evR.Switch.Broadcast(EvidencePoolChannel, struct{ EvidencePoolMessage }{msg}) + + // NOTE: Broadcast runs asynchronously, so this should wait on the successChan + // in another routine before marking to be proper. + idx := 1 // TODO + evR.evpool.evidenceStore.MarkEvidenceAsBroadcasted(idx, evidence) + case <-ticker.C: + // broadcast all pending evidence + msg := EvidenceMessage{evR.evpool.PendingEvidence()} + evR.Switch.Broadcast(EvidencePoolChannel, struct{ EvidencePoolMessage }{msg}) + case <-evR.Quit: + return + } + } +} + //----------------------------------------------------------------------------- // Messages @@ -129,7 +164,7 @@ func DecodeMessage(bz []byte) (msgType byte, msg EvidencePoolMessage, err error) // EvidenceMessage contains a list of evidence. type EvidenceMessage struct { - Evidence types.Evidences + Evidence []types.Evidence } // String returns a string representation of the EvidenceMessage. diff --git a/evidence/store.go b/evidence/store.go new file mode 100644 index 000000000..82268391b --- /dev/null +++ b/evidence/store.go @@ -0,0 +1,100 @@ +package evpool + +import ( + "fmt" + + wire "github.com/tendermint/go-wire" + "github.com/tendermint/tendermint/types" + dbm "github.com/tendermint/tmlibs/db" +) + +/* +"evidence-lookup"// -> evidence struct +"evidence-outqueue"/// -> nil +"evidence-pending"//evidence-hash> -> nil +*/ + +var nullValue = []byte{0} + +type evidenceInfo struct { + Committed bool + Priority int + Evidence types.Evidence +} + +func keyLookup(evidence types.Evidence) []byte { + return []byte(fmt.Sprintf("evidence-lookup/%d/%X", evidence.Height(), evidence.Hash())) +} + +func keyOutqueue(idx int, evidence types.Evidence) []byte { + return []byte(fmt.Sprintf("evidence-outqueue/%d/%d/%X", idx, evidence.Height(), evidence.Hash())) +} + +func keyPending(evidence types.Evidence) []byte { + return []byte(fmt.Sprintf("evidence-pending/%d/%X", evidence.Height(), evidence.Hash())) +} + +// EvidenceStore stores all the evidence we've seen, including +// evidence that has been committed, evidence that has been seen but not broadcast, +// and evidence that has been broadcast but not yet committed. +type EvidenceStore struct { + chainID string + db dbm.DB +} + +func NewEvidenceStore(chainID string, db dbm.DB) *EvidenceStore { + return &EvidenceStore{ + chainID: chainID, + db: db, + } +} + +// AddNewEvidence adds the given evidence to the database. +func (store *EvidenceStore) AddNewEvidence(idx int, evidence types.Evidence) (bool, error) { + // check if we already have seen it + key := keyLookup(evidence) + v := store.db.Get(key) + if len(v) == 0 { + return false, nil + } + + // verify the evidence + if err := evidence.Verify(store.chainID); err != nil { + return false, err + } + + // add it to the store + ei := evidenceInfo{ + Committed: false, + Priority: idx, + Evidence: evidence, + } + store.db.Set(key, wire.BinaryBytes(ei)) + + key = keyOutqueue(idx, evidence) + store.db.Set(key, nullValue) + + key = keyPending(evidence) + store.db.Set(key, nullValue) + + return true, nil +} + +// MarkEvidenceAsBroadcasted removes evidence from the outqueue. +func (store *EvidenceStore) MarkEvidenceAsBroadcasted(idx int, evidence types.Evidence) { + key := keyOutqueue(idx, evidence) + store.db.Delete(key) +} + +// MarkEvidenceAsPending removes evidence from pending and sets the state to committed. +func (store *EvidenceStore) MarkEvidenceAsCommitted(evidence types.Evidence) { + key := keyPending(evidence) + store.db.Delete(key) + + key = keyLookup(evidence) + var ei evidenceInfo + b := store.db.Get(key) + wire.ReadBinaryBytes(b, &ei) + ei.Committed = true + store.db.Set(key, wire.BinaryBytes(ei)) +} diff --git a/types/block.go b/types/block.go index b3e754427..96bfbf9d5 100644 --- a/types/block.go +++ b/types/block.go @@ -437,7 +437,8 @@ func (data *Data) StringIndented(indent string) string { // EvidenceData contains any evidence of malicious wrong-doing by validators type EvidenceData struct { - Evidences Evidences `json:"evidence"` + // TODO: FIXME + Evidences evidences `json:"evidence"` // Volatile hash data.Bytes diff --git a/types/evidence.go b/types/evidence.go index d40c63ea2..65f34e90e 100644 --- a/types/evidence.go +++ b/types/evidence.go @@ -3,6 +3,7 @@ package types import ( "bytes" "fmt" + "sync" "github.com/tendermint/go-crypto" "github.com/tendermint/tmlibs/merkle" @@ -27,6 +28,7 @@ func (err *ErrEvidenceInvalid) Error() string { // Evidence represents any provable malicious activity by a validator type Evidence interface { + Height() int Address() []byte Hash() []byte Verify(chainID string) error @@ -37,9 +39,72 @@ type Evidence interface { //------------------------------------------- -type Evidences []Evidence +//EvidenceSet is a thread-safe set of evidence. +type EvidenceSet struct { + sync.RWMutex + evidences evidences +} + +//Evidence returns a copy of all the evidence. +func (evset EvidenceSet) Evidence() []Evidence { + evset.RLock() + defer evset.RUnlock() + evCopy := make([]Evidence, len(evset.evidences)) + for i, ev := range evset.evidences { + evCopy[i] = ev + } + return evCopy +} + +// Size returns the number of pieces of evidence in the set. +func (evset EvidenceSet) Size() int { + evset.RLock() + defer evset.RUnlock() + return len(evset.evidences) +} -func (evs Evidences) Hash() []byte { +// Hash returns a merkle hash of the evidence. +func (evset EvidenceSet) Hash() []byte { + evset.RLock() + defer evset.RUnlock() + return evset.evidences.Hash() +} + +// Has returns true if the given evidence is in the set. +func (evset EvidenceSet) Has(evidence Evidence) bool { + evset.RLock() + defer evset.RUnlock() + return evset.evidences.Has(evidence) +} + +// String returns a string representation of the evidence. +func (evset EvidenceSet) String() string { + evset.RLock() + defer evset.RUnlock() + return evset.evidences.String() +} + +// Add adds the given evidence to the set. +// TODO: and persists it to disk. +func (evset EvidenceSet) Add(evidence Evidence) { + evset.Lock() + defer evset.Unlock() + evset.evidences = append(evset.evidences, evidence) +} + +// Reset empties the evidence set. +func (evset EvidenceSet) Reset() { + evset.Lock() + defer evset.Unlock() + evset.evidences = make(evidences, 0) + +} + +//------------------------------------------- + +type evidences []Evidence + +func (evs evidences) Hash() []byte { // Recursive impl. // Copied from tmlibs/merkle to avoid allocations switch len(evs) { @@ -48,13 +113,13 @@ func (evs Evidences) Hash() []byte { case 1: return evs[0].Hash() default: - left := Evidences(evs[:(len(evs)+1)/2]).Hash() - right := Evidences(evs[(len(evs)+1)/2:]).Hash() + left := evidences(evs[:(len(evs)+1)/2]).Hash() + right := evidences(evs[(len(evs)+1)/2:]).Hash() return merkle.SimpleHashFromTwoHashes(left, right) } } -func (evs Evidences) String() string { +func (evs evidences) String() string { s := "" for _, e := range evs { s += fmt.Sprintf("%s\t\t", e) @@ -62,7 +127,7 @@ func (evs Evidences) String() string { return s } -func (evs Evidences) Has(evidence Evidence) bool { +func (evs evidences) Has(evidence Evidence) bool { for _, ev := range evs { if ev.Equal(evidence) { return true @@ -86,6 +151,11 @@ func (dve *DuplicateVoteEvidence) String() string { } +// Height returns the height this evidence refers to. +func (dve *DuplicateVoteEvidence) Height() int { + return dve.VoteA.Height +} + // Address returns the address of the validator. func (dve *DuplicateVoteEvidence) Address() []byte { return dve.PubKey.Address()