From 7d81a3f4a5fba31a08f13e0f0180c9f9c8b934ae Mon Sep 17 00:00:00 2001 From: Ethan Buchman Date: Wed, 27 Dec 2017 01:27:03 -0500 Subject: [PATCH] address some comments from review --- consensus/replay.go | 1 + evidence/pool.go | 9 +++++++-- evidence/reactor.go | 8 ++++---- evidence/store.go | 11 ++++------- node/node.go | 2 +- 5 files changed, 17 insertions(+), 14 deletions(-) diff --git a/consensus/replay.go b/consensus/replay.go index 56f09d129..209ea5972 100644 --- a/consensus/replay.go +++ b/consensus/replay.go @@ -323,6 +323,7 @@ func (h *Handshaker) replayBlocks(proxyApp proxy.AppConns, appBlockHeight, store // Note that we don't have an old version of the state, // so we by-pass state validation/mutation using sm.ExecCommitBlock. // This also means we won't be saving validator sets if they change during this period. + // TODO: Load the historical information to fix this and just use state.ApplyBlock // // If mutateState == true, the final block is replayed with h.replayBlock() diff --git a/evidence/pool.go b/evidence/pool.go index 821fdf3c2..2296ac028 100644 --- a/evidence/pool.go +++ b/evidence/pool.go @@ -12,9 +12,10 @@ type EvidencePool struct { params types.EvidenceParams logger log.Logger - state types.State + state types.State // TODO: update this on commit! evidenceStore *EvidenceStore + // never close evidenceChan chan types.Evidence } @@ -52,10 +53,13 @@ func (evpool *EvidencePool) PendingEvidence() []types.Evidence { // AddEvidence checks the evidence is valid and adds it to the pool. // Blocks on the EvidenceChan. func (evpool *EvidencePool) AddEvidence(evidence types.Evidence) (err error) { + // TODO: check if we already have evidence for this + // validator at this height so we dont get spammed - // XXX: is this thread safe ? priority, err := evpool.state.VerifyEvidence(evidence) if err != nil { + // TODO: if err is just that we cant find it cuz we pruned, ignore. + // TODO: if its actually bad evidence, punish peer return err } @@ -67,6 +71,7 @@ func (evpool *EvidencePool) AddEvidence(evidence types.Evidence) (err error) { evpool.logger.Info("Verified new evidence of byzantine behaviour", "evidence", evidence) + // never closes. always safe to send on evpool.evidenceChan <- evidence return nil } diff --git a/evidence/reactor.go b/evidence/reactor.go index ff502ed0c..cb9706a34 100644 --- a/evidence/reactor.go +++ b/evidence/reactor.go @@ -66,8 +66,8 @@ func (evR *EvidenceReactor) GetChannels() []*p2p.ChannelDescriptor { func (evR *EvidenceReactor) AddPeer(peer p2p.Peer) { // send the peer our high-priority evidence. // the rest will be sent by the broadcastRoutine - evidence := evR.evpool.PriorityEvidence() - msg := &EvidenceListMessage{evidence} + evidences := evR.evpool.PriorityEvidence() + msg := &EvidenceListMessage{evidences} success := peer.Send(EvidenceChannel, struct{ EvidenceMessage }{msg}) if !success { // TODO: remove peer ? @@ -108,8 +108,8 @@ func (evR *EvidenceReactor) SetEventBus(b *types.EventBus) { evR.eventBus = b } -// broadcast new evidence to all peers. -// broadcasts must be non-blocking so routine is always available to read off EvidenceChan. +// Broadcast new evidence to all peers. +// Broadcasts must be non-blocking so routine is always available to read off EvidenceChan. func (evR *EvidenceReactor) broadcastRoutine() { ticker := time.NewTicker(time.Second * broadcastEvidenceIntervalS) for { diff --git a/evidence/store.go b/evidence/store.go index 5b677c347..fd40b5338 100644 --- a/evidence/store.go +++ b/evidence/store.go @@ -47,20 +47,20 @@ func keyLookup(evidence types.Evidence) []byte { } // big endian padded hex -func be(h int64) string { +func bE(h int64) string { return fmt.Sprintf("%0.16X", h) } func keyLookupFromHeightAndHash(height int64, hash []byte) []byte { - return _key("%s/%s/%X", baseKeyLookup, be(height), hash) + return _key("%s/%s/%X", baseKeyLookup, bE(height), hash) } func keyOutqueue(evidence types.Evidence, priority int64) []byte { - return _key("%s/%s/%s/%X", baseKeyOutqueue, be(priority), be(evidence.Height()), evidence.Hash()) + return _key("%s/%s/%s/%X", baseKeyOutqueue, bE(priority), bE(evidence.Height()), evidence.Hash()) } func keyPending(evidence types.Evidence) []byte { - return _key("%s/%s/%X", baseKeyPending, be(evidence.Height()), evidence.Hash()) + return _key("%s/%s/%X", baseKeyPending, bE(evidence.Height()), evidence.Hash()) } func _key(fmt_ string, o ...interface{}) []byte { @@ -170,9 +170,6 @@ func (store *EvidenceStore) MarkEvidenceAsCommitted(evidence types.Evidence) { ei := store.getEvidenceInfo(evidence) ei.Committed = true - // TODO: we should use the state db and db.Sync in state.Save instead. - // Else, if we call this before state.Save, we may never mark committed evidence as committed. - // Else, if we call this after state.Save, we may get stuck broadcasting evidence we never know we committed. lookupKey := keyLookup(evidence) store.db.SetSync(lookupKey, wire.BinaryBytes(ei)) } diff --git a/node/node.go b/node/node.go index 1f51354a8..53eab6e0e 100644 --- a/node/node.go +++ b/node/node.go @@ -216,7 +216,7 @@ func NewNode(config *cfg.Config, } evidenceLogger := logger.With("module", "evidence") evidenceStore := evidence.NewEvidenceStore(evidenceDB) - evidencePool := evidence.NewEvidencePool(state.ConsensusParams.EvidenceParams, evidenceStore, state) + evidencePool := evidence.NewEvidencePool(state.ConsensusParams.EvidenceParams, evidenceStore, state.Copy()) evidencePool.SetLogger(evidenceLogger) evidenceReactor := evidence.NewEvidenceReactor(evidencePool) evidenceReactor.SetLogger(evidenceLogger)