diff --git a/evidence/reactor.go b/evidence/reactor.go index a6aa66b1f..de18f6b8e 100644 --- a/evidence/reactor.go +++ b/evidence/reactor.go @@ -8,6 +8,7 @@ import ( "github.com/tendermint/go-amino" "github.com/tendermint/tmlibs/log" + cstypes "github.com/tendermint/tendermint/consensus/types" "github.com/tendermint/tendermint/p2p" "github.com/tendermint/tendermint/types" ) @@ -118,21 +119,48 @@ func (evR *EvidenceReactor) broadcastRoutine() { case evidence := <-evR.evpool.EvidenceChan(): // broadcast some new evidence msg := &EvidenceListMessage{[]types.Evidence{evidence}} - evR.Switch.Broadcast(EvidenceChannel, cdc.MustMarshalBinaryBare(msg)) + evR.broadcastEvidenceListMsg(msg) - // TODO: Broadcast runs asynchronously, so this should wait on the successChan - // in another routine before marking to be proper. + // TODO: the broadcast here is just doing TrySend. + // We should make sure the send succeeds before marking broadcasted. evR.evpool.evidenceStore.MarkEvidenceAsBroadcasted(evidence) case <-ticker.C: // broadcast all pending evidence msg := &EvidenceListMessage{evR.evpool.PendingEvidence()} - evR.Switch.Broadcast(EvidenceChannel, cdc.MustMarshalBinaryBare(msg)) + evR.broadcastEvidenceListMsg(msg) case <-evR.Quit(): return } } } +func (evR *EvidenceReactor) broadcastEvidenceListMsg(msg *EvidenceListMessage) { + // NOTE: we dont send evidence to peers higher than their height, + // because they can't validate it (don't have validators from the height). + // So, for now, only send the `msg` to peers synced to the highest height in the list. + // TODO: send each peer all the evidence below its current height - + // might require a routine per peer, like the mempool. + + var maxHeight int64 + for _, ev := range msg.Evidence { + if ev.Height() > maxHeight { + maxHeight = ev.Height() + } + } + + for _, peer := range evR.Switch.Peers().List() { + ps := peer.Get(types.PeerStateKey).(PeerState) + rs := ps.GetRoundState() + if rs.Height >= maxHeight { + peer.TrySend(EvidenceChannel, cdc.MustMarshalBinaryBare(msg)) + } + } +} + +type PeerState interface { + GetRoundState() *cstypes.PeerRoundState +} + //----------------------------------------------------------------------------- // Messages diff --git a/evidence/reactor_test.go b/evidence/reactor_test.go index 11c63929b..6b4b24a07 100644 --- a/evidence/reactor_test.go +++ b/evidence/reactor_test.go @@ -14,6 +14,7 @@ import ( "github.com/tendermint/tmlibs/log" cfg "github.com/tendermint/tendermint/config" + cstypes "github.com/tendermint/tendermint/consensus/types" "github.com/tendermint/tendermint/p2p" "github.com/tendermint/tendermint/types" ) @@ -130,8 +131,50 @@ func TestReactorBroadcastEvidence(t *testing.T) { // make reactors from statedb reactors := makeAndConnectEvidenceReactors(config, stateDBs) + // set the peer height on each reactor + for _, r := range reactors { + for _, peer := range r.Switch.Peers().List() { + ps := peerState{height} + peer.Set(types.PeerStateKey, ps) + } + } + // send a bunch of valid evidence to the first reactor's evpool // and wait for them all to be received in the others evList := sendEvidence(t, reactors[0].evpool, valAddr, NUM_EVIDENCE) waitForEvidence(t, evList, reactors) } + +type peerState struct { + height int64 +} + +func (ps peerState) GetRoundState() *cstypes.PeerRoundState { + return &cstypes.PeerRoundState{ + Height: ps.height, + } +} + +func TestReactorSelectiveBroadcast(t *testing.T) { + config := cfg.TestConfig() + + valAddr := []byte("myval") + height1 := int64(NUM_EVIDENCE) + 10 + height2 := int64(NUM_EVIDENCE) / 2 + + // DB1 is ahead of DB2 + stateDB1 := initializeValidatorState(valAddr, height1) + stateDB2 := initializeValidatorState(valAddr, height2) + + // make reactors from statedb + reactors := makeAndConnectEvidenceReactors(config, []dbm.DB{stateDB1, stateDB2}) + peer := reactors[0].Switch.Peers().List()[0] + ps := peerState{height2} + peer.Set(types.PeerStateKey, ps) + + // send a bunch of valid evidence to the first reactor's evpool + evList := sendEvidence(t, reactors[0].evpool, valAddr, NUM_EVIDENCE) + + // only ones less than the peers height should make it through + waitForEvidence(t, evList[:NUM_EVIDENCE/2], reactors[1:2]) +} diff --git a/evidence/store.go b/evidence/store.go index 081715e3c..abfc5e1fe 100644 --- a/evidence/store.go +++ b/evidence/store.go @@ -164,7 +164,7 @@ func (store *EvidenceStore) MarkEvidenceAsBroadcasted(evidence types.Evidence) { store.db.Delete(key) } -// MarkEvidenceAsPending removes evidence from pending and outqueue and sets the state to committed. +// MarkEvidenceAsCommitted removes evidence from pending and outqueue and sets the state to committed. func (store *EvidenceStore) MarkEvidenceAsCommitted(evidence types.Evidence) { // if its committed, its been broadcast store.MarkEvidenceAsBroadcasted(evidence) diff --git a/state/execution.go b/state/execution.go index 3fe35e2fa..81d157f1d 100644 --- a/state/execution.go +++ b/state/execution.go @@ -106,7 +106,7 @@ func (blockExec *BlockExecutor) ApplyBlock(s State, blockID types.BlockID, block fail.Fail() // XXX - // Update evpool now that state is saved + // Update evpool now that state is saved. // TODO: handle the crash/recover scenario // ie. (may need to call Update for last block) blockExec.evpool.Update(block)