Browse Source

Merge pull request #1628 from tendermint/bucky/selective-evidence-broadcast

evidence: dont send evidence to unsynced peers
pull/1693/head
Ethan Buchman 7 years ago
committed by GitHub
parent
commit
61002ad264
No known key found for this signature in database GPG Key ID: 4AEE18F83AFDEB23
20 changed files with 359 additions and 232 deletions
  1. +7
    -2
      CHANGELOG.md
  2. +1
    -1
      blockchain/reactor_test.go
  3. +1
    -1
      consensus/common_test.go
  4. +3
    -3
      consensus/replay.go
  5. +1
    -1
      consensus/replay_file.go
  6. +2
    -2
      consensus/replay_test.go
  7. +4
    -4
      consensus/state.go
  8. +2
    -2
      consensus/wal_generator.go
  9. +53
    -21
      evidence/pool.go
  10. +9
    -11
      evidence/pool_test.go
  11. +90
    -31
      evidence/reactor.go
  12. +47
    -2
      evidence/reactor_test.go
  13. +1
    -5
      evidence/store.go
  14. +6
    -6
      rpc/core/pipe.go
  15. +30
    -32
      state/execution.go
  16. +4
    -3
      state/execution_test.go
  17. +28
    -35
      state/services.go
  18. +31
    -31
      state/state.go
  19. +7
    -7
      state/store.go
  20. +32
    -32
      state/validation.go

+ 7
- 2
CHANGELOG.md View File

@ -2,7 +2,6 @@
## 0.19.9
*TBD*
## 0.19.8
@ -38,7 +37,7 @@ FEATURES
IMPROVEMENTS:
- [consensus] consensus reactor now receives events from a separate event bus,
- [consensus] Consensus reactor now receives events from a separate synchronous event bus,
which is not dependant on external RPC load
- [consensus/wal] do not look for height in older files if we've seen height - 1
- [docs] Various cleanup and link fixes
@ -51,6 +50,12 @@ BUG FIXES
- [blockchain] Fix fast-sync deadlock during high peer turnover
BUG FIX:
- [evidence] Dont send peers evidence from heights they haven't synced to yet
- [p2p] Refuse connections to more than one peer with the same IP
- [docs] Various fixes
## 0.19.5
*May 20th, 2018*


+ 1
- 1
blockchain/reactor_test.go View File

@ -36,7 +36,7 @@ func newBlockchainReactor(logger log.Logger, maxBlockHeight int64) *BlockchainRe
fastSync := true
var nilApp proxy.AppConnConsensus
blockExec := sm.NewBlockExecutor(dbm.NewMemDB(), log.TestingLogger(), nilApp,
types.MockMempool{}, types.MockEvidencePool{})
sm.MockMempool{}, sm.MockEvidencePool{})
bcReactor := NewBlockchainReactor(state.Copy(), blockExec, blockStore, fastSync)
bcReactor.SetLogger(logger.With("module", "blockchain"))


+ 1
- 1
consensus/common_test.go View File

@ -262,7 +262,7 @@ func newConsensusStateWithConfigAndBlockStore(thisConfig *cfg.Config, state sm.S
}
// mock the evidence pool
evpool := types.MockEvidencePool{}
evpool := sm.MockEvidencePool{}
// Make ConsensusState
stateDB := dbm.NewMemDB()


+ 3
- 3
consensus/replay.go View File

@ -196,7 +196,7 @@ func makeHeightSearchFunc(height int64) auto.SearchFunc {
type Handshaker struct {
stateDB dbm.DB
initialState sm.State
store types.BlockStore
store sm.BlockStore
appState json.RawMessage
logger log.Logger
@ -204,7 +204,7 @@ type Handshaker struct {
}
func NewHandshaker(stateDB dbm.DB, state sm.State,
store types.BlockStore, appState json.RawMessage) *Handshaker {
store sm.BlockStore, appState json.RawMessage) *Handshaker {
return &Handshaker{
stateDB: stateDB,
@ -390,7 +390,7 @@ func (h *Handshaker) replayBlock(state sm.State, height int64, proxyApp proxy.Ap
block := h.store.LoadBlock(height)
meta := h.store.LoadBlockMeta(height)
blockExec := sm.NewBlockExecutor(h.stateDB, h.logger, proxyApp, types.MockMempool{}, types.MockEvidencePool{})
blockExec := sm.NewBlockExecutor(h.stateDB, h.logger, proxyApp, sm.MockMempool{}, sm.MockEvidencePool{})
var err error
state, err = blockExec.ApplyBlock(state, meta.BlockID, block)


+ 1
- 1
consensus/replay_file.go View File

@ -310,7 +310,7 @@ func newConsensusStateForReplay(config cfg.BaseConfig, csConfig *cfg.ConsensusCo
cmn.Exit(cmn.Fmt("Failed to start event bus: %v", err))
}
mempool, evpool := types.MockMempool{}, types.MockEvidencePool{}
mempool, evpool := sm.MockMempool{}, sm.MockEvidencePool{}
blockExec := sm.NewBlockExecutor(stateDB, log.TestingLogger(), proxyApp.Consensus(), mempool, evpool)
consensusState := NewConsensusState(csConfig, state.Copy(), blockExec,


+ 2
- 2
consensus/replay_test.go View File

@ -263,8 +263,8 @@ const (
)
var (
mempool = types.MockMempool{}
evpool = types.MockEvidencePool{}
mempool = sm.MockMempool{}
evpool = sm.MockEvidencePool{}
)
//---------------------------------------


+ 4
- 4
consensus/state.go View File

@ -76,9 +76,9 @@ type ConsensusState struct {
// services for creating and executing blocks
// TODO: encapsulate all of this in one "BlockManager"
blockExec *sm.BlockExecutor
blockStore types.BlockStore
mempool types.Mempool
evpool types.EvidencePool
blockStore sm.BlockStore
mempool sm.Mempool
evpool sm.EvidencePool
// internal state
mtx sync.Mutex
@ -118,7 +118,7 @@ type ConsensusState struct {
}
// NewConsensusState returns a new ConsensusState.
func NewConsensusState(config *cfg.ConsensusConfig, state sm.State, blockExec *sm.BlockExecutor, blockStore types.BlockStore, mempool types.Mempool, evpool types.EvidencePool) *ConsensusState {
func NewConsensusState(config *cfg.ConsensusConfig, state sm.State, blockExec *sm.BlockExecutor, blockStore sm.BlockStore, mempool sm.Mempool, evpool sm.EvidencePool) *ConsensusState {
cs := &ConsensusState{
config: config,
blockExec: blockExec,


+ 2
- 2
consensus/wal_generator.go View File

@ -65,8 +65,8 @@ func WALWithNBlocks(numBlocks int) (data []byte, err error) {
return nil, errors.Wrap(err, "failed to start event bus")
}
defer eventBus.Stop()
mempool := types.MockMempool{}
evpool := types.MockEvidencePool{}
mempool := sm.MockMempool{}
evpool := sm.MockEvidencePool{}
blockExec := sm.NewBlockExecutor(stateDB, log.TestingLogger(), proxyApp.Consensus(), mempool, evpool)
consensusState := NewConsensusState(config.Consensus, state.Copy(), blockExec, blockStore, mempool, evpool)
consensusState.SetLogger(logger)


+ 53
- 21
evidence/pool.go View File

@ -4,6 +4,7 @@ import (
"fmt"
"sync"
clist "github.com/tendermint/tmlibs/clist"
dbm "github.com/tendermint/tmlibs/db"
"github.com/tendermint/tmlibs/log"
@ -17,6 +18,7 @@ type EvidencePool struct {
logger log.Logger
evidenceStore *EvidenceStore
evidenceList *clist.CList // concurrent linked-list of evidence
// needed to load validators to verify evidence
stateDB dbm.DB
@ -24,9 +26,6 @@ type EvidencePool struct {
// latest state
mtx sync.Mutex
state sm.State
// never close
evidenceChan chan types.Evidence
}
func NewEvidencePool(stateDB dbm.DB, evidenceStore *EvidenceStore) *EvidencePool {
@ -35,21 +34,24 @@ func NewEvidencePool(stateDB dbm.DB, evidenceStore *EvidenceStore) *EvidencePool
state: sm.LoadState(stateDB),
logger: log.NewNopLogger(),
evidenceStore: evidenceStore,
evidenceChan: make(chan types.Evidence),
evidenceList: clist.New(),
}
return evpool
}
func (evpool *EvidencePool) EvidenceFront() *clist.CElement {
return evpool.evidenceList.Front()
}
func (evpool *EvidencePool) EvidenceWaitChan() <-chan struct{} {
return evpool.evidenceList.WaitChan()
}
// SetLogger sets the Logger.
func (evpool *EvidencePool) SetLogger(l log.Logger) {
evpool.logger = l
}
// EvidenceChan returns an unbuffered channel on which new evidence can be received.
func (evpool *EvidencePool) EvidenceChan() <-chan types.Evidence {
return evpool.evidenceChan
}
// PriorityEvidence returns the priority evidence.
func (evpool *EvidencePool) PriorityEvidence() []types.Evidence {
return evpool.evidenceStore.PriorityEvidence()
@ -68,22 +70,23 @@ func (evpool *EvidencePool) State() sm.State {
}
// Update loads the latest
func (evpool *EvidencePool) Update(block *types.Block) {
evpool.mtx.Lock()
defer evpool.mtx.Unlock()
func (evpool *EvidencePool) Update(block *types.Block, state sm.State) {
state := sm.LoadState(evpool.stateDB)
// sanity check
if state.LastBlockHeight != block.Height {
panic(fmt.Sprintf("EvidencePool.Update: loaded state with height %d when block.Height=%d", state.LastBlockHeight, block.Height))
panic(fmt.Sprintf("Failed EvidencePool.Update sanity check: got state.Height=%d with block.Height=%d", state.LastBlockHeight, block.Height))
}
// update the state
evpool.mtx.Lock()
evpool.state = state
evpool.mtx.Unlock()
// NOTE: shouldn't need the mutex
evpool.MarkEvidenceAsCommitted(block.Evidence.Evidence)
// remove evidence from pending and mark committed
evpool.MarkEvidenceAsCommitted(block.Height, block.Evidence.Evidence)
}
// AddEvidence checks the evidence is valid and adds it to the pool.
// Blocks on the EvidenceChan.
func (evpool *EvidencePool) AddEvidence(evidence types.Evidence) (err error) {
// TODO: check if we already have evidence for this
@ -107,14 +110,43 @@ func (evpool *EvidencePool) AddEvidence(evidence types.Evidence) (err error) {
evpool.logger.Info("Verified new evidence of byzantine behaviour", "evidence", evidence)
// never closes. always safe to send on
evpool.evidenceChan <- evidence
// add evidence to clist
evpool.evidenceList.PushBack(evidence)
return nil
}
// MarkEvidenceAsCommitted marks all the evidence as committed.
func (evpool *EvidencePool) MarkEvidenceAsCommitted(evidence []types.Evidence) {
// MarkEvidenceAsCommitted marks all the evidence as committed and removes it from the queue.
func (evpool *EvidencePool) MarkEvidenceAsCommitted(height int64, evidence []types.Evidence) {
// make a map of committed evidence to remove from the clist
blockEvidenceMap := make(map[string]struct{})
for _, ev := range evidence {
evpool.evidenceStore.MarkEvidenceAsCommitted(ev)
blockEvidenceMap[evMapKey(ev)] = struct{}{}
}
// remove committed evidence from the clist
maxAge := evpool.State().ConsensusParams.EvidenceParams.MaxAge
evpool.removeEvidence(height, maxAge, blockEvidenceMap)
}
func (evpool *EvidencePool) removeEvidence(height, maxAge int64, blockEvidenceMap map[string]struct{}) {
for e := evpool.evidenceList.Front(); e != nil; e = e.Next() {
ev := e.Value.(types.Evidence)
// Remove the evidence if it's already in a block
// or if it's now too old.
if _, ok := blockEvidenceMap[evMapKey(ev)]; ok ||
ev.Height() < height-maxAge {
// remove from clist
evpool.evidenceList.Remove(e)
e.DetachPrev()
}
}
}
func evMapKey(ev types.Evidence) string {
return string(ev.Hash())
}

+ 9
- 11
evidence/pool_test.go View File

@ -45,7 +45,6 @@ func initializeValidatorState(valAddr []byte, height int64) dbm.DB {
}
func TestEvidencePool(t *testing.T) {
assert := assert.New(t)
valAddr := []byte("val1")
height := int64(5)
@ -56,26 +55,25 @@ func TestEvidencePool(t *testing.T) {
goodEvidence := types.NewMockGoodEvidence(height, 0, valAddr)
badEvidence := types.MockBadEvidence{goodEvidence}
// bad evidence
err := pool.AddEvidence(badEvidence)
assert.NotNil(err)
assert.NotNil(t, err)
var wg sync.WaitGroup
wg.Add(1)
go func() {
<-pool.EvidenceChan()
<-pool.EvidenceWaitChan()
wg.Done()
}()
err = pool.AddEvidence(goodEvidence)
assert.Nil(err)
assert.Nil(t, err)
wg.Wait()
// if we send it again it wont fire on the chan
assert.Equal(t, 1, pool.evidenceList.Len())
// if we send it again, it shouldnt change the size
err = pool.AddEvidence(goodEvidence)
assert.Nil(err)
select {
case <-pool.EvidenceChan():
t.Fatal("unexpected read on EvidenceChan")
default:
}
assert.Nil(t, err)
assert.Equal(t, 1, pool.evidenceList.Len())
}

+ 90
- 31
evidence/reactor.go View File

@ -6,6 +6,7 @@ import (
"time"
"github.com/tendermint/go-amino"
clist "github.com/tendermint/tmlibs/clist"
"github.com/tendermint/tmlibs/log"
"github.com/tendermint/tendermint/p2p"
@ -15,8 +16,10 @@ import (
const (
EvidenceChannel = byte(0x38)
maxMsgSize = 1048576 // 1MB TODO make it configurable
broadcastEvidenceIntervalS = 60 // broadcast uncommitted evidence this often
maxMsgSize = 1048576 // 1MB TODO make it configurable
broadcastEvidenceIntervalS = 60 // broadcast uncommitted evidence this often
peerCatchupSleepIntervalMS = 100 // If peer is behind, sleep this amount
)
// EvidenceReactor handles evpool evidence broadcasting amongst peers.
@ -43,11 +46,7 @@ func (evR *EvidenceReactor) SetLogger(l log.Logger) {
// OnStart implements cmn.Service
func (evR *EvidenceReactor) OnStart() error {
if err := evR.BaseReactor.OnStart(); err != nil {
return err
}
go evR.broadcastRoutine()
return nil
return evR.BaseReactor.OnStart()
}
// GetChannels implements Reactor.
@ -63,14 +62,7 @@ func (evR *EvidenceReactor) GetChannels() []*p2p.ChannelDescriptor {
// AddPeer implements Reactor.
func (evR *EvidenceReactor) AddPeer(peer p2p.Peer) {
// send the peer our high-priority evidence.
// the rest will be sent by the broadcastRoutine
evidences := evR.evpool.PriorityEvidence()
msg := &EvidenceListMessage{evidences}
success := peer.Send(EvidenceChannel, cdc.MustMarshalBinaryBare(msg))
if !success {
// TODO: remove peer ?
}
go evR.broadcastEvidenceRoutine(peer)
}
// RemovePeer implements Reactor.
@ -109,30 +101,97 @@ func (evR *EvidenceReactor) SetEventBus(b *types.EventBus) {
evR.eventBus = b
}
// Broadcast new evidence to all peers.
// Broadcasts must be non-blocking so routine is always available to read off EvidenceChan.
func (evR *EvidenceReactor) broadcastRoutine() {
ticker := time.NewTicker(time.Second * broadcastEvidenceIntervalS)
// Modeled after the mempool routine.
// - Evidence accumulates in a clist.
// - Each peer has a routien that iterates through the clist,
// sending available evidence to the peer.
// - If we're waiting for new evidence and the list is not empty,
// start iterating from the beginning again.
func (evR *EvidenceReactor) broadcastEvidenceRoutine(peer p2p.Peer) {
var next *clist.CElement
for {
// This happens because the CElement we were looking at got garbage
// collected (removed). That is, .NextWait() returned nil. Go ahead and
// start from the beginning.
if next == nil {
select {
case <-evR.evpool.EvidenceWaitChan(): // Wait until evidence is available
if next = evR.evpool.EvidenceFront(); next == nil {
continue
}
case <-peer.Quit():
return
case <-evR.Quit():
return
}
}
ev := next.Value.(types.Evidence)
msg, retry := evR.checkSendEvidenceMessage(peer, ev)
if msg != nil {
success := peer.Send(EvidenceChannel, cdc.MustMarshalBinaryBare(msg))
retry = !success
}
if retry {
time.Sleep(peerCatchupSleepIntervalMS * time.Millisecond)
continue
}
afterCh := time.After(time.Second * broadcastEvidenceIntervalS)
select {
case evidence := <-evR.evpool.EvidenceChan():
// broadcast some new evidence
msg := &EvidenceListMessage{[]types.Evidence{evidence}}
evR.Switch.Broadcast(EvidenceChannel, cdc.MustMarshalBinaryBare(msg))
// TODO: Broadcast runs asynchronously, so this should wait on the successChan
// in another routine before marking to be proper.
evR.evpool.evidenceStore.MarkEvidenceAsBroadcasted(evidence)
case <-ticker.C:
// broadcast all pending evidence
msg := &EvidenceListMessage{evR.evpool.PendingEvidence()}
evR.Switch.Broadcast(EvidenceChannel, cdc.MustMarshalBinaryBare(msg))
case <-afterCh:
// start from the beginning every tick.
// TODO: only do this if we're at the end of the list!
next = nil
case <-next.NextWaitChan():
// see the start of the for loop for nil check
next = next.Next()
case <-peer.Quit():
return
case <-evR.Quit():
return
}
}
}
// Returns the message to send the peer, or nil if the evidence is invalid for the peer.
// If message is nil, return true if we should sleep and try again.
func (evR EvidenceReactor) checkSendEvidenceMessage(peer p2p.Peer, ev types.Evidence) (msg EvidenceMessage, retry bool) {
// make sure the peer is up to date
evHeight := ev.Height()
peerState, ok := peer.Get(types.PeerStateKey).(PeerState)
if !ok {
evR.Logger.Info("Found peer without PeerState", "peer", peer)
return nil, true
}
// NOTE: We only send evidence to peers where
// peerHeight - maxAge < evidenceHeight < peerHeight
maxAge := evR.evpool.State().ConsensusParams.EvidenceParams.MaxAge
peerHeight := peerState.GetHeight()
if peerHeight < evHeight {
// peer is behind. sleep while he catches up
return nil, true
} else if peerHeight > evHeight+maxAge {
// evidence is too old, skip
// NOTE: if evidence is too old for an honest peer,
// then we're behind and either it already got committed or it never will!
evR.Logger.Info("Not sending peer old evidence", "peerHeight", peerHeight, "evHeight", evHeight, "maxAge", maxAge, "peer", peer)
return nil, false
}
// send evidence
msg = &EvidenceListMessage{[]types.Evidence{ev}}
return msg, false
}
// PeerState describes the state of a peer.
type PeerState interface {
GetHeight() int64
}
//-----------------------------------------------------------------------------
// Messages


+ 47
- 2
evidence/reactor_test.go View File

@ -84,7 +84,7 @@ func _waitForEvidence(t *testing.T, wg *sync.WaitGroup, evs types.EvidenceList,
}
reapedEv := evpool.PendingEvidence()
// put the reaped evidence is a map so we can quickly check we got everything
// put the reaped evidence in a map so we can quickly check we got everything
evMap := make(map[string]types.Evidence)
for _, e := range reapedEv {
evMap[string(e.Hash())] = e
@ -95,6 +95,7 @@ func _waitForEvidence(t *testing.T, wg *sync.WaitGroup, evs types.EvidenceList,
fmt.Sprintf("evidence at index %d on reactor %d don't match: %v vs %v",
i, reactorIdx, expectedEv, gotEv))
}
wg.Done()
}
@ -110,7 +111,7 @@ func sendEvidence(t *testing.T, evpool *EvidencePool, valAddr []byte, n int) typ
}
var (
NUM_EVIDENCE = 1
NUM_EVIDENCE = 10
TIMEOUT = 120 * time.Second // ridiculously high because CircleCI is slow
)
@ -130,8 +131,52 @@ func TestReactorBroadcastEvidence(t *testing.T) {
// make reactors from statedb
reactors := makeAndConnectEvidenceReactors(config, stateDBs)
// set the peer height on each reactor
for _, r := range reactors {
for _, peer := range r.Switch.Peers().List() {
ps := peerState{height}
peer.Set(types.PeerStateKey, ps)
}
}
// send a bunch of valid evidence to the first reactor's evpool
// and wait for them all to be received in the others
evList := sendEvidence(t, reactors[0].evpool, valAddr, NUM_EVIDENCE)
waitForEvidence(t, evList, reactors)
}
type peerState struct {
height int64
}
func (ps peerState) GetHeight() int64 {
return ps.height
}
func TestReactorSelectiveBroadcast(t *testing.T) {
config := cfg.TestConfig()
valAddr := []byte("myval")
height1 := int64(NUM_EVIDENCE) + 10
height2 := int64(NUM_EVIDENCE) / 2
// DB1 is ahead of DB2
stateDB1 := initializeValidatorState(valAddr, height1)
stateDB2 := initializeValidatorState(valAddr, height2)
// make reactors from statedb
reactors := makeAndConnectEvidenceReactors(config, []dbm.DB{stateDB1, stateDB2})
peer := reactors[0].Switch.Peers().List()[0]
ps := peerState{height2}
peer.Set(types.PeerStateKey, ps)
// send a bunch of valid evidence to the first reactor's evpool
evList := sendEvidence(t, reactors[0].evpool, valAddr, NUM_EVIDENCE)
// only ones less than the peers height should make it through
waitForEvidence(t, evList[:NUM_EVIDENCE/2], reactors[1:2])
// peers should still be connected
peers := reactors[1].Switch.Peers().List()
assert.Equal(t, 1, len(peers))
}

+ 1
- 5
evidence/store.go View File

@ -17,10 +17,6 @@ Impl:
- First commit atomically in outqueue, pending, lookup.
- Once broadcast, remove from outqueue. No need to sync
- Once committed, atomically remove from pending and update lookup.
- TODO: If we crash after committed but before removing/updating,
we'll be stuck broadcasting evidence we never know we committed.
so either share the state db and atomically MarkCommitted
with ApplyBlock, or check all outqueue/pending on Start to see if its committed
Schema for indexing evidence (note you need both height and hash to find a piece of evidence):
@ -164,7 +160,7 @@ func (store *EvidenceStore) MarkEvidenceAsBroadcasted(evidence types.Evidence) {
store.db.Delete(key)
}
// MarkEvidenceAsPending removes evidence from pending and outqueue and sets the state to committed.
// MarkEvidenceAsCommitted removes evidence from pending and outqueue and sets the state to committed.
func (store *EvidenceStore) MarkEvidenceAsCommitted(evidence types.Evidence) {
// if its committed, its been broadcast
store.MarkEvidenceAsBroadcasted(evidence)


+ 6
- 6
rpc/core/pipe.go View File

@ -51,9 +51,9 @@ var (
// interfaces defined in types and above
stateDB dbm.DB
blockStore types.BlockStore
mempool types.Mempool
evidencePool types.EvidencePool
blockStore sm.BlockStore
mempool sm.Mempool
evidencePool sm.EvidencePool
consensusState Consensus
p2pSwitch P2P
@ -72,15 +72,15 @@ func SetStateDB(db dbm.DB) {
stateDB = db
}
func SetBlockStore(bs types.BlockStore) {
func SetBlockStore(bs sm.BlockStore) {
blockStore = bs
}
func SetMempool(mem types.Mempool) {
func SetMempool(mem sm.Mempool) {
mempool = mem
}
func SetEvidencePool(evpool types.EvidencePool) {
func SetEvidencePool(evpool sm.EvidencePool) {
evidencePool = evpool
}


+ 30
- 32
state/execution.go View File

@ -29,8 +29,8 @@ type BlockExecutor struct {
eventBus types.BlockEventPublisher
// update these with block results after commit
mempool types.Mempool
evpool types.EvidencePool
mempool Mempool
evpool EvidencePool
logger log.Logger
}
@ -38,7 +38,7 @@ type BlockExecutor struct {
// NewBlockExecutor returns a new BlockExecutor with a NopEventBus.
// Call SetEventBus to provide one.
func NewBlockExecutor(db dbm.DB, logger log.Logger, proxyApp proxy.AppConnConsensus,
mempool types.Mempool, evpool types.EvidencePool) *BlockExecutor {
mempool Mempool, evpool EvidencePool) *BlockExecutor {
return &BlockExecutor{
db: db,
proxyApp: proxyApp,
@ -59,8 +59,8 @@ func (blockExec *BlockExecutor) SetEventBus(eventBus types.BlockEventPublisher)
// If the block is invalid, it returns an error.
// Validation does not mutate state, but does require historical information from the stateDB,
// ie. to verify evidence from a validator at an old height.
func (blockExec *BlockExecutor) ValidateBlock(s State, block *types.Block) error {
return validateBlock(blockExec.db, s, block)
func (blockExec *BlockExecutor) ValidateBlock(state State, block *types.Block) error {
return validateBlock(blockExec.db, state, block)
}
// ApplyBlock validates the block against the state, executes it against the app,
@ -68,15 +68,15 @@ func (blockExec *BlockExecutor) ValidateBlock(s State, block *types.Block) error
// It's the only function that needs to be called
// from outside this package to process and commit an entire block.
// It takes a blockID to avoid recomputing the parts hash.
func (blockExec *BlockExecutor) ApplyBlock(s State, blockID types.BlockID, block *types.Block) (State, error) {
func (blockExec *BlockExecutor) ApplyBlock(state State, blockID types.BlockID, block *types.Block) (State, error) {
if err := blockExec.ValidateBlock(s, block); err != nil {
return s, ErrInvalidBlock(err)
if err := blockExec.ValidateBlock(state, block); err != nil {
return state, ErrInvalidBlock(err)
}
abciResponses, err := execBlockOnProxyApp(blockExec.logger, blockExec.proxyApp, block)
if err != nil {
return s, ErrProxyAppConn(err)
return state, ErrProxyAppConn(err)
}
fail.Fail() // XXX
@ -87,35 +87,33 @@ func (blockExec *BlockExecutor) ApplyBlock(s State, blockID types.BlockID, block
fail.Fail() // XXX
// update the state with the block and responses
s, err = updateState(s, blockID, block.Header, abciResponses)
state, err = updateState(state, blockID, block.Header, abciResponses)
if err != nil {
return s, fmt.Errorf("Commit failed for application: %v", err)
return state, fmt.Errorf("Commit failed for application: %v", err)
}
// lock mempool, commit state, update mempoool
// lock mempool, commit app state, update mempoool
appHash, err := blockExec.Commit(block)
if err != nil {
return s, fmt.Errorf("Commit failed for application: %v", err)
return state, fmt.Errorf("Commit failed for application: %v", err)
}
// Update evpool with the block and state.
blockExec.evpool.Update(block, state)
fail.Fail() // XXX
// update the app hash and save the state
s.AppHash = appHash
SaveState(blockExec.db, s)
state.AppHash = appHash
SaveState(blockExec.db, state)
fail.Fail() // XXX
// Update evpool now that state is saved
// TODO: handle the crash/recover scenario
// ie. (may need to call Update for last block)
blockExec.evpool.Update(block)
// events are fired after everything else
// NOTE: if we crash between Commit and Save, events wont be fired during replay
fireEvents(blockExec.logger, blockExec.eventBus, block, abciResponses)
return s, nil
return state, nil
}
// Commit locks the mempool, runs the ABCI Commit message, and updates the mempool.
@ -283,20 +281,20 @@ func updateValidators(currentSet *types.ValidatorSet, updates []abci.Validator)
}
// updateState returns a new State updated according to the header and responses.
func updateState(s State, blockID types.BlockID, header *types.Header,
func updateState(state State, blockID types.BlockID, header *types.Header,
abciResponses *ABCIResponses) (State, error) {
// copy the valset so we can apply changes from EndBlock
// and update s.LastValidators and s.Validators
prevValSet := s.Validators.Copy()
prevValSet := state.Validators.Copy()
nextValSet := prevValSet.Copy()
// update the validator set with the latest abciResponses
lastHeightValsChanged := s.LastHeightValidatorsChanged
lastHeightValsChanged := state.LastHeightValidatorsChanged
if len(abciResponses.EndBlock.ValidatorUpdates) > 0 {
err := updateValidators(nextValSet, abciResponses.EndBlock.ValidatorUpdates)
if err != nil {
return s, fmt.Errorf("Error changing validator set: %v", err)
return state, fmt.Errorf("Error changing validator set: %v", err)
}
// change results from this height but only applies to the next height
lastHeightValsChanged = header.Height + 1
@ -306,14 +304,14 @@ func updateState(s State, blockID types.BlockID, header *types.Header,
nextValSet.IncrementAccum(1)
// update the params with the latest abciResponses
nextParams := s.ConsensusParams
lastHeightParamsChanged := s.LastHeightConsensusParamsChanged
nextParams := state.ConsensusParams
lastHeightParamsChanged := state.LastHeightConsensusParamsChanged
if abciResponses.EndBlock.ConsensusParamUpdates != nil {
// NOTE: must not mutate s.ConsensusParams
nextParams = s.ConsensusParams.Update(abciResponses.EndBlock.ConsensusParamUpdates)
nextParams = state.ConsensusParams.Update(abciResponses.EndBlock.ConsensusParamUpdates)
err := nextParams.Validate()
if err != nil {
return s, fmt.Errorf("Error updating consensus params: %v", err)
return state, fmt.Errorf("Error updating consensus params: %v", err)
}
// change results from this height but only applies to the next height
lastHeightParamsChanged = header.Height + 1
@ -322,13 +320,13 @@ func updateState(s State, blockID types.BlockID, header *types.Header,
// NOTE: the AppHash has not been populated.
// It will be filled on state.Save.
return State{
ChainID: s.ChainID,
ChainID: state.ChainID,
LastBlockHeight: header.Height,
LastBlockTotalTx: s.LastBlockTotalTx + header.NumTxs,
LastBlockTotalTx: state.LastBlockTotalTx + header.NumTxs,
LastBlockID: blockID,
LastBlockTime: header.Time,
Validators: nextValSet,
LastValidators: s.Validators.Copy(),
LastValidators: state.Validators.Copy(),
LastHeightValidatorsChanged: lastHeightValsChanged,
ConsensusParams: nextParams,
LastHeightConsensusParamsChanged: lastHeightParamsChanged,


+ 4
- 3
state/execution_test.go View File

@ -10,11 +10,12 @@ import (
"github.com/tendermint/abci/example/kvstore"
abci "github.com/tendermint/abci/types"
crypto "github.com/tendermint/go-crypto"
"github.com/tendermint/tendermint/proxy"
"github.com/tendermint/tendermint/types"
cmn "github.com/tendermint/tmlibs/common"
dbm "github.com/tendermint/tmlibs/db"
"github.com/tendermint/tmlibs/log"
"github.com/tendermint/tendermint/proxy"
"github.com/tendermint/tendermint/types"
)
var (
@ -34,7 +35,7 @@ func TestApplyBlock(t *testing.T) {
state, stateDB := state(), dbm.NewMemDB()
blockExec := NewBlockExecutor(stateDB, log.TestingLogger(), proxyApp.Consensus(),
types.MockMempool{}, types.MockEvidencePool{})
MockMempool{}, MockEvidencePool{})
block := makeBlock(state, 1)
blockID := types.BlockID{block.Hash(), block.MakePartSet(testPartSize).Header()}


types/services.go → state/services.go View File


+ 31
- 31
state/state.go View File

@ -55,67 +55,67 @@ type State struct {
}
// Copy makes a copy of the State for mutating.
func (s State) Copy() State {
func (state State) Copy() State {
return State{
ChainID: s.ChainID,
ChainID: state.ChainID,
LastBlockHeight: s.LastBlockHeight,
LastBlockTotalTx: s.LastBlockTotalTx,
LastBlockID: s.LastBlockID,
LastBlockTime: s.LastBlockTime,
LastBlockHeight: state.LastBlockHeight,
LastBlockTotalTx: state.LastBlockTotalTx,
LastBlockID: state.LastBlockID,
LastBlockTime: state.LastBlockTime,
Validators: s.Validators.Copy(),
LastValidators: s.LastValidators.Copy(),
LastHeightValidatorsChanged: s.LastHeightValidatorsChanged,
Validators: state.Validators.Copy(),
LastValidators: state.LastValidators.Copy(),
LastHeightValidatorsChanged: state.LastHeightValidatorsChanged,
ConsensusParams: s.ConsensusParams,
LastHeightConsensusParamsChanged: s.LastHeightConsensusParamsChanged,
ConsensusParams: state.ConsensusParams,
LastHeightConsensusParamsChanged: state.LastHeightConsensusParamsChanged,
AppHash: s.AppHash,
AppHash: state.AppHash,
LastResultsHash: s.LastResultsHash,
LastResultsHash: state.LastResultsHash,
}
}
// Equals returns true if the States are identical.
func (s State) Equals(s2 State) bool {
sbz, s2bz := s.Bytes(), s2.Bytes()
func (state State) Equals(state2 State) bool {
sbz, s2bz := state.Bytes(), state2.Bytes()
return bytes.Equal(sbz, s2bz)
}
// Bytes serializes the State using go-amino.
func (s State) Bytes() []byte {
return cdc.MustMarshalBinaryBare(s)
func (state State) Bytes() []byte {
return cdc.MustMarshalBinaryBare(state)
}
// IsEmpty returns true if the State is equal to the empty State.
func (s State) IsEmpty() bool {
return s.Validators == nil // XXX can't compare to Empty
func (state State) IsEmpty() bool {
return state.Validators == nil // XXX can't compare to Empty
}
// GetValidators returns the last and current validator sets.
func (s State) GetValidators() (last *types.ValidatorSet, current *types.ValidatorSet) {
return s.LastValidators, s.Validators
func (state State) GetValidators() (last *types.ValidatorSet, current *types.ValidatorSet) {
return state.LastValidators, state.Validators
}
//------------------------------------------------------------------------
// Create a block from the latest state
// MakeBlock builds a block with the given txs and commit from the current state.
func (s State) MakeBlock(height int64, txs []types.Tx, commit *types.Commit) (*types.Block, *types.PartSet) {
func (state State) MakeBlock(height int64, txs []types.Tx, commit *types.Commit) (*types.Block, *types.PartSet) {
// build base block
block := types.MakeBlock(height, txs, commit)
// fill header with state data
block.ChainID = s.ChainID
block.TotalTxs = s.LastBlockTotalTx + block.NumTxs
block.LastBlockID = s.LastBlockID
block.ValidatorsHash = s.Validators.Hash()
block.AppHash = s.AppHash
block.ConsensusHash = s.ConsensusParams.Hash()
block.LastResultsHash = s.LastResultsHash
return block, block.MakePartSet(s.ConsensusParams.BlockGossip.BlockPartSizeBytes)
block.ChainID = state.ChainID
block.TotalTxs = state.LastBlockTotalTx + block.NumTxs
block.LastBlockID = state.LastBlockID
block.ValidatorsHash = state.Validators.Hash()
block.AppHash = state.AppHash
block.ConsensusHash = state.ConsensusParams.Hash()
block.LastResultsHash = state.LastResultsHash
return block, block.MakePartSet(state.ConsensusParams.BlockGossip.BlockPartSizeBytes)
}
//------------------------------------------------------------------------


+ 7
- 7
state/store.go View File

@ -80,15 +80,15 @@ func loadState(db dbm.DB, key []byte) (state State) {
}
// SaveState persists the State, the ValidatorsInfo, and the ConsensusParamsInfo to the database.
func SaveState(db dbm.DB, s State) {
saveState(db, s, stateKey)
func SaveState(db dbm.DB, state State) {
saveState(db, state, stateKey)
}
func saveState(db dbm.DB, s State, key []byte) {
nextHeight := s.LastBlockHeight + 1
saveValidatorsInfo(db, nextHeight, s.LastHeightValidatorsChanged, s.Validators)
saveConsensusParamsInfo(db, nextHeight, s.LastHeightConsensusParamsChanged, s.ConsensusParams)
db.SetSync(stateKey, s.Bytes())
func saveState(db dbm.DB, state State, key []byte) {
nextHeight := state.LastBlockHeight + 1
saveValidatorsInfo(db, nextHeight, state.LastHeightValidatorsChanged, state.Validators)
saveConsensusParamsInfo(db, nextHeight, state.LastHeightConsensusParamsChanged, state.ConsensusParams)
db.SetSync(stateKey, state.Bytes())
}
//------------------------------------------------------------------------


+ 32
- 32
state/validation.go View File

@ -12,69 +12,69 @@ import (
//-----------------------------------------------------
// Validate block
func validateBlock(stateDB dbm.DB, s State, b *types.Block) error {
func validateBlock(stateDB dbm.DB, state State, block *types.Block) error {
// validate internal consistency
if err := b.ValidateBasic(); err != nil {
if err := block.ValidateBasic(); err != nil {
return err
}
// validate basic info
if b.ChainID != s.ChainID {
return fmt.Errorf("Wrong Block.Header.ChainID. Expected %v, got %v", s.ChainID, b.ChainID)
if block.ChainID != state.ChainID {
return fmt.Errorf("Wrong Block.Header.ChainID. Expected %v, got %v", state.ChainID, block.ChainID)
}
if b.Height != s.LastBlockHeight+1 {
return fmt.Errorf("Wrong Block.Header.Height. Expected %v, got %v", s.LastBlockHeight+1, b.Height)
if block.Height != state.LastBlockHeight+1 {
return fmt.Errorf("Wrong Block.Header.Height. Expected %v, got %v", state.LastBlockHeight+1, block.Height)
}
/* TODO: Determine bounds for Time
See blockchain/reactor "stopSyncingDurationMinutes"
if !b.Time.After(lastBlockTime) {
if !block.Time.After(lastBlockTime) {
return errors.New("Invalid Block.Header.Time")
}
*/
// validate prev block info
if !b.LastBlockID.Equals(s.LastBlockID) {
return fmt.Errorf("Wrong Block.Header.LastBlockID. Expected %v, got %v", s.LastBlockID, b.LastBlockID)
if !block.LastBlockID.Equals(state.LastBlockID) {
return fmt.Errorf("Wrong Block.Header.LastBlockID. Expected %v, got %v", state.LastBlockID, block.LastBlockID)
}
newTxs := int64(len(b.Data.Txs))
if b.TotalTxs != s.LastBlockTotalTx+newTxs {
return fmt.Errorf("Wrong Block.Header.TotalTxs. Expected %v, got %v", s.LastBlockTotalTx+newTxs, b.TotalTxs)
newTxs := int64(len(block.Data.Txs))
if block.TotalTxs != state.LastBlockTotalTx+newTxs {
return fmt.Errorf("Wrong Block.Header.TotalTxs. Expected %v, got %v", state.LastBlockTotalTx+newTxs, block.TotalTxs)
}
// validate app info
if !bytes.Equal(b.AppHash, s.AppHash) {
return fmt.Errorf("Wrong Block.Header.AppHash. Expected %X, got %v", s.AppHash, b.AppHash)
if !bytes.Equal(block.AppHash, state.AppHash) {
return fmt.Errorf("Wrong Block.Header.AppHash. Expected %X, got %v", state.AppHash, block.AppHash)
}
if !bytes.Equal(b.ConsensusHash, s.ConsensusParams.Hash()) {
return fmt.Errorf("Wrong Block.Header.ConsensusHash. Expected %X, got %v", s.ConsensusParams.Hash(), b.ConsensusHash)
if !bytes.Equal(block.ConsensusHash, state.ConsensusParams.Hash()) {
return fmt.Errorf("Wrong Block.Header.ConsensusHash. Expected %X, got %v", state.ConsensusParams.Hash(), block.ConsensusHash)
}
if !bytes.Equal(b.LastResultsHash, s.LastResultsHash) {
return fmt.Errorf("Wrong Block.Header.LastResultsHash. Expected %X, got %v", s.LastResultsHash, b.LastResultsHash)
if !bytes.Equal(block.LastResultsHash, state.LastResultsHash) {
return fmt.Errorf("Wrong Block.Header.LastResultsHash. Expected %X, got %v", state.LastResultsHash, block.LastResultsHash)
}
if !bytes.Equal(b.ValidatorsHash, s.Validators.Hash()) {
return fmt.Errorf("Wrong Block.Header.ValidatorsHash. Expected %X, got %v", s.Validators.Hash(), b.ValidatorsHash)
if !bytes.Equal(block.ValidatorsHash, state.Validators.Hash()) {
return fmt.Errorf("Wrong Block.Header.ValidatorsHash. Expected %X, got %v", state.Validators.Hash(), block.ValidatorsHash)
}
// Validate block LastCommit.
if b.Height == 1 {
if len(b.LastCommit.Precommits) != 0 {
if block.Height == 1 {
if len(block.LastCommit.Precommits) != 0 {
return errors.New("Block at height 1 (first block) should have no LastCommit precommits")
}
} else {
if len(b.LastCommit.Precommits) != s.LastValidators.Size() {
if len(block.LastCommit.Precommits) != state.LastValidators.Size() {
return fmt.Errorf("Invalid block commit size. Expected %v, got %v",
s.LastValidators.Size(), len(b.LastCommit.Precommits))
state.LastValidators.Size(), len(block.LastCommit.Precommits))
}
err := s.LastValidators.VerifyCommit(
s.ChainID, s.LastBlockID, b.Height-1, b.LastCommit)
err := state.LastValidators.VerifyCommit(
state.ChainID, state.LastBlockID, block.Height-1, block.LastCommit)
if err != nil {
return err
}
}
for _, ev := range b.Evidence.Evidence {
if err := VerifyEvidence(stateDB, s, ev); err != nil {
for _, ev := range block.Evidence.Evidence {
if err := VerifyEvidence(stateDB, state, ev); err != nil {
return types.NewEvidenceInvalidErr(ev, err)
}
}
@ -87,17 +87,17 @@ func validateBlock(stateDB dbm.DB, s State, b *types.Block) error {
// VerifyEvidence verifies the evidence fully by checking it is internally
// consistent and sufficiently recent.
func VerifyEvidence(stateDB dbm.DB, s State, evidence types.Evidence) error {
height := s.LastBlockHeight
func VerifyEvidence(stateDB dbm.DB, state State, evidence types.Evidence) error {
height := state.LastBlockHeight
evidenceAge := height - evidence.Height()
maxAge := s.ConsensusParams.EvidenceParams.MaxAge
maxAge := state.ConsensusParams.EvidenceParams.MaxAge
if evidenceAge > maxAge {
return fmt.Errorf("Evidence from height %d is too old. Min height is %d",
evidence.Height(), height-maxAge)
}
if err := evidence.Verify(s.ChainID); err != nil {
if err := evidence.Verify(state.ChainID); err != nil {
return err
}


Loading…
Cancel
Save