Browse Source

Merge pull request #265 from tendermint/handshake

Handshake
pull/317/head
Ethan Buchman 8 years ago
committed by GitHub
parent
commit
3206e101f5
25 changed files with 986 additions and 197 deletions
  1. +2
    -2
      consensus/mempool_test.go
  2. +1
    -6
      consensus/reactor.go
  3. +39
    -75
      consensus/state.go
  4. +8
    -6
      glide.lock
  5. +1
    -2
      mempool/mempool.go
  6. +10
    -20
      node/node.go
  7. +9
    -5
      proxy/app_conn.go
  8. +3
    -3
      proxy/app_conn_test.go
  9. +22
    -14
      proxy/multi_app_conn.go
  10. +0
    -9
      proxy/state.go
  11. +2
    -2
      rpc/core/tmsp.go
  12. +4
    -1
      rpc/core/types/responses.go
  13. +55
    -0
      state/errors.go
  14. +297
    -42
      state/execution.go
  15. +193
    -0
      state/execution_test.go
  16. +51
    -6
      state/state.go
  17. +42
    -0
      state/state_test.go
  18. +68
    -0
      test/persist/test.sh
  19. +104
    -0
      test/persist/test2.sh
  20. +3
    -0
      test/run_test.sh
  21. +21
    -0
      types/block.go
  22. +3
    -1
      types/events.go
  23. +44
    -0
      types/protobuf.go
  24. +3
    -2
      types/validator.go
  25. +1
    -1
      types/validator_set.go

+ 2
- 2
consensus/mempool_test.go View File

@ -122,8 +122,8 @@ func NewCounterApplication() *CounterApplication {
return &CounterApplication{} return &CounterApplication{}
} }
func (app *CounterApplication) Info() string {
return Fmt("txs:%v", app.txCount)
func (app *CounterApplication) Info() (string, *tmsp.TMSPInfo, *tmsp.LastBlockInfo, *tmsp.ConfigInfo) {
return Fmt("txs:%v", app.txCount), nil, nil, nil
} }
func (app *CounterApplication) SetOption(key string, value string) (log string) { func (app *CounterApplication) SetOption(key string, value string) (log string) {


+ 1
- 6
consensus/reactor.go View File

@ -658,12 +658,7 @@ OUTER_LOOP:
{ {
prs := ps.GetRoundState() prs := ps.GetRoundState()
if prs.CatchupCommitRound != -1 && 0 < prs.Height && prs.Height <= conR.conS.blockStore.Height() { if prs.CatchupCommitRound != -1 && 0 < prs.Height && prs.Height <= conR.conS.blockStore.Height() {
var commit *types.Commit
if prs.Height == conR.conS.blockStore.Height() {
commit = conR.conS.blockStore.LoadSeenCommit(prs.Height)
} else {
commit = conR.conS.blockStore.LoadBlockCommit(prs.Height)
}
commit := conR.conS.LoadCommit(prs.Height)
peer.TrySend(StateChannel, struct{ ConsensusMessage }{&VoteSetMaj23Message{ peer.TrySend(StateChannel, struct{ ConsensusMessage }{&VoteSetMaj23Message{
Height: prs.Height, Height: prs.Height,
Round: commit.Round(), Round: commit.Round(),


+ 39
- 75
consensus/state.go View File

@ -8,6 +8,8 @@ import (
"sync" "sync"
"time" "time"
"github.com/ebuchman/fail-test"
. "github.com/tendermint/go-common" . "github.com/tendermint/go-common"
cfg "github.com/tendermint/go-config" cfg "github.com/tendermint/go-config"
"github.com/tendermint/go-wire" "github.com/tendermint/go-wire"
@ -317,6 +319,15 @@ func (cs *ConsensusState) SetPrivValidator(priv PrivValidator) {
cs.privValidator = priv cs.privValidator = priv
} }
func (cs *ConsensusState) LoadCommit(height int) *types.Commit {
cs.mtx.Lock()
defer cs.mtx.Unlock()
if height == cs.blockStore.Height() {
return cs.blockStore.LoadSeenCommit(height)
}
return cs.blockStore.LoadBlockCommit(height)
}
func (cs *ConsensusState) OnStart() error { func (cs *ConsensusState) OnStart() error {
cs.BaseService.OnStart() cs.BaseService.OnStart()
@ -930,25 +941,8 @@ func (cs *ConsensusState) createProposalBlock() (block *types.Block, blockParts
// Mempool validated transactions // Mempool validated transactions
txs := cs.mempool.Reap(cs.config.GetInt("block_size")) txs := cs.mempool.Reap(cs.config.GetInt("block_size"))
block = &types.Block{
Header: &types.Header{
ChainID: cs.state.ChainID,
Height: cs.Height,
Time: time.Now(),
NumTxs: len(txs),
LastBlockID: cs.state.LastBlockID,
ValidatorsHash: cs.state.Validators.Hash(),
AppHash: cs.state.AppHash, // state merkle root of txs from the previous block.
},
LastCommit: commit,
Data: &types.Data{
Txs: txs,
},
}
block.FillHeader()
blockParts = block.MakePartSet(cs.config.GetInt("block_part_size"))
return block, blockParts
return types.MakeBlock(cs.Height, cs.state.ChainID, txs, commit,
cs.state.LastBlockID, cs.state.Validators.Hash(), cs.state.AppHash, cs.config.GetInt("block_part_size"))
} }
// Enter: `timeoutPropose` after entering Propose. // Enter: `timeoutPropose` after entering Propose.
@ -1251,50 +1245,46 @@ func (cs *ConsensusState) finalizeCommit(height int) {
PanicConsensus(Fmt("+2/3 committed an invalid block: %v", err)) PanicConsensus(Fmt("+2/3 committed an invalid block: %v", err))
} }
log.Notice(Fmt("Finalizing commit of block with %d txs", block.NumTxs), "height", block.Height, "hash", block.Hash())
log.Notice(Fmt("Finalizing commit of block with %d txs", block.NumTxs),
"height", block.Height, "hash", block.Hash(), "root", block.AppHash)
log.Info(Fmt("%v", block)) log.Info(Fmt("%v", block))
// Fire off event for new block.
// TODO: Handle app failure. See #177
types.FireEventNewBlock(cs.evsw, types.EventDataNewBlock{block})
types.FireEventNewBlockHeader(cs.evsw, types.EventDataNewBlockHeader{block.Header})
fail.Fail() // XXX
// Save to blockStore.
if cs.blockStore.Height() < block.Height {
precommits := cs.Votes.Precommits(cs.CommitRound)
seenCommit := precommits.MakeCommit()
cs.blockStore.SaveBlock(block, blockParts, seenCommit)
} else {
log.Warn("Why are we finalizeCommitting a block height we already have?", "height", block.Height)
}
fail.Fail() // XXX
// Create a copy of the state for staging // Create a copy of the state for staging
// and an event cache for txs
stateCopy := cs.state.Copy() stateCopy := cs.state.Copy()
// event cache for txs
eventCache := types.NewEventCache(cs.evsw) eventCache := types.NewEventCache(cs.evsw)
// Run the block on the State:
// + update validator sets
// + run txs on the proxyAppConn
err := stateCopy.ExecBlock(eventCache, cs.proxyAppConn, block, blockParts.Header())
if err != nil {
// TODO: handle this gracefully.
PanicQ(Fmt("Exec failed for application: %v", err))
}
// Execute and commit the block, and update the mempool.
// All calls to the proxyAppConn should come here.
// NOTE: the block.AppHash wont reflect these txs until the next block
stateCopy.ApplyBlock(eventCache, cs.proxyAppConn, block, blockParts.Header(), cs.mempool)
// lock mempool, commit state, update mempoool
err = cs.commitStateUpdateMempool(stateCopy, block)
if err != nil {
// TODO: handle this gracefully.
PanicQ(Fmt("Commit failed for application: %v", err))
}
fail.Fail() // XXX
// txs committed, bad ones removed from mepool; fire events
// NOTE: the block.AppHash wont reflect these txs until the next block
// Fire off event for new block.
// TODO: Handle app failure. See #177
types.FireEventNewBlock(cs.evsw, types.EventDataNewBlock{block})
types.FireEventNewBlockHeader(cs.evsw, types.EventDataNewBlockHeader{block.Header})
eventCache.Flush() eventCache.Flush()
// Save to blockStore.
if cs.blockStore.Height() < block.Height {
precommits := cs.Votes.Precommits(cs.CommitRound)
seenCommit := precommits.MakeCommit()
cs.blockStore.SaveBlock(block, blockParts, seenCommit)
}
// Save the state. // Save the state.
stateCopy.Save() stateCopy.Save()
fail.Fail() // XXX
// NewHeightStep! // NewHeightStep!
cs.updateToState(stateCopy) cs.updateToState(stateCopy)
@ -1309,32 +1299,6 @@ func (cs *ConsensusState) finalizeCommit(height int) {
return return
} }
// mempool must be locked during commit and update
// because state is typically reset on Commit and old txs must be replayed
// against committed state before new txs are run in the mempool, lest they be invalid
func (cs *ConsensusState) commitStateUpdateMempool(s *sm.State, block *types.Block) error {
cs.mempool.Lock()
defer cs.mempool.Unlock()
// Commit block, get hash back
res := cs.proxyAppConn.CommitSync()
if res.IsErr() {
log.Warn("Error in proxyAppConn.CommitSync", "error", res)
return res
}
if res.Log != "" {
log.Debug("Commit.Log: " + res.Log)
}
// Set the state's new AppHash
s.AppHash = res.Data
// Update mempool.
cs.mempool.Update(block.Height, block.Txs)
return nil
}
//----------------------------------------------------------------------------- //-----------------------------------------------------------------------------
func (cs *ConsensusState) defaultSetProposal(proposal *types.Proposal) error { func (cs *ConsensusState) defaultSetProposal(proposal *types.Proposal) error {


+ 8
- 6
glide.lock View File

@ -1,14 +1,16 @@
hash: 20cb38481a78b73ba3a42af08e34cd825ddb7c826833d67cc61e45c1b3a4c484 hash: 20cb38481a78b73ba3a42af08e34cd825ddb7c826833d67cc61e45c1b3a4c484
updated: 2016-11-15T15:54:25.75591193-05:00
updated: 2016-11-16T16:25:10.693961906-05:00
imports: imports:
- name: github.com/btcsuite/btcd - name: github.com/btcsuite/btcd
version: d9a674e1b7bc09d0830d6986c71cf5f535d753c3
version: b134beb3b7809de6370a93cc5f6a684d6942e2e8
subpackages: subpackages:
- btcec - btcec
- name: github.com/btcsuite/fastsha256 - name: github.com/btcsuite/fastsha256
version: 637e656429416087660c84436a2a035d69d54e2e version: 637e656429416087660c84436a2a035d69d54e2e
- name: github.com/BurntSushi/toml - name: github.com/BurntSushi/toml
version: 99064174e013895bbd9b025c31100bd1d9b590ca version: 99064174e013895bbd9b025c31100bd1d9b590ca
- name: github.com/ebuchman/fail-test
version: c1eddaa09da2b4017351245b0d43234955276798
- name: github.com/go-stack/stack - name: github.com/go-stack/stack
version: 100eb0c0a9c5b306ca2fb4f165df21d80ada4b82 version: 100eb0c0a9c5b306ca2fb4f165df21d80ada4b82
- name: github.com/gogo/protobuf - name: github.com/gogo/protobuf
@ -16,7 +18,7 @@ imports:
subpackages: subpackages:
- proto - proto
- name: github.com/golang/protobuf - name: github.com/golang/protobuf
version: da116c3771bf4a398a43f44e069195ef1c9688ef
version: 224aaba33b1ac32a92a165f27489409fb8133d08
subpackages: subpackages:
- proto - proto
- name: github.com/golang/snappy - name: github.com/golang/snappy
@ -92,7 +94,7 @@ imports:
subpackages: subpackages:
- term - term
- name: github.com/tendermint/tmsp - name: github.com/tendermint/tmsp
version: eece35eeebacee1ab94b8338e77e0d1c2d880ecc
version: 0bdb3b887e70b1ef16d32eece0248ec071fd8490
subpackages: subpackages:
- client - client
- example/counter - example/counter
@ -112,7 +114,7 @@ imports:
- ripemd160 - ripemd160
- salsa20/salsa - salsa20/salsa
- name: golang.org/x/net - name: golang.org/x/net
version: cac22060de4e495155959e69adcb4b45763ccb10
version: 4971afdc2f162e82d185353533d3cf16188a9f4e
subpackages: subpackages:
- context - context
- http2 - http2
@ -126,7 +128,7 @@ imports:
subpackages: subpackages:
- unix - unix
- name: google.golang.org/grpc - name: google.golang.org/grpc
version: 0d9891286aca15aeb2b0a73be9f5946c3cfefa85
version: 941cc894cea3c87a12943fd12b594964541b6d28
subpackages: subpackages:
- codes - codes
- credentials - credentials


+ 1
- 2
mempool/mempool.go View File

@ -282,8 +282,7 @@ func (mem *Mempool) collectTxs(maxTxs int) []types.Tx {
// NOTE: this should be called *after* block is committed by consensus. // NOTE: this should be called *after* block is committed by consensus.
// NOTE: unsafe; Lock/Unlock must be managed by caller // NOTE: unsafe; Lock/Unlock must be managed by caller
func (mem *Mempool) Update(height int, txs []types.Tx) { func (mem *Mempool) Update(height int, txs []types.Tx) {
// mem.proxyMtx.Lock()
// defer mem.proxyMtx.Unlock()
// TODO: check err ?
mem.proxyAppConn.FlushSync() // To flush async resCb calls e.g. from CheckTx mem.proxyAppConn.FlushSync() // To flush async resCb calls e.g. from CheckTx
// First, create a lookup map of txns in new txs. // First, create a lookup map of txns in new txs.


+ 10
- 20
node/node.go View File

@ -60,11 +60,10 @@ func NewNode(config cfg.Config, privValidator *types.PrivValidator, clientCreato
stateDB := dbm.NewDB("state", config.GetString("db_backend"), config.GetString("db_dir")) stateDB := dbm.NewDB("state", config.GetString("db_backend"), config.GetString("db_dir"))
// Get State // Get State
state := getState(config, stateDB)
state := sm.GetState(config, stateDB)
// Create the proxyApp, which houses three connections:
// query, consensus, and mempool
proxyApp := proxy.NewAppConns(config, clientCreator, state, blockStore)
// Create the proxyApp, which manages connections (consensus, mempool, query)
proxyApp := proxy.NewAppConns(config, clientCreator, sm.NewHandshaker(config, state, blockStore))
if _, err := proxyApp.Start(); err != nil { if _, err := proxyApp.Start(); err != nil {
Exit(Fmt("Error starting proxy app connections: %v", err)) Exit(Fmt("Error starting proxy app connections: %v", err))
} }
@ -296,17 +295,6 @@ func makeNodeInfo(config cfg.Config, sw *p2p.Switch, privKey crypto.PrivKeyEd255
return nodeInfo return nodeInfo
} }
// Load the most recent state from "state" db,
// or create a new one (and save) from genesis.
func getState(config cfg.Config, stateDB dbm.DB) *sm.State {
state := sm.LoadState(stateDB)
if state == nil {
state = sm.MakeGenesisStateFromFile(stateDB, config.GetString("genesis_file"))
state.Save()
}
return state
}
//------------------------------------------------------------------------------ //------------------------------------------------------------------------------
// Users wishing to: // Users wishing to:
@ -391,17 +379,19 @@ func newConsensusState(config cfg.Config) *consensus.ConsensusState {
stateDB := dbm.NewDB("state", config.GetString("db_backend"), config.GetString("db_dir")) stateDB := dbm.NewDB("state", config.GetString("db_backend"), config.GetString("db_dir"))
state := sm.MakeGenesisStateFromFile(stateDB, config.GetString("genesis_file")) state := sm.MakeGenesisStateFromFile(stateDB, config.GetString("genesis_file"))
// Create two proxyAppConn connections,
// one for the consensus and one for the mempool.
proxyApp := proxy.NewAppConns(config, proxy.DefaultClientCreator(config), state, blockStore)
// Create proxyAppConn connection (consensus, mempool, query)
proxyApp := proxy.NewAppConns(config, proxy.DefaultClientCreator(config), sm.NewHandshaker(config, state, blockStore))
_, err := proxyApp.Start()
if err != nil {
Exit(Fmt("Error starting proxy app conns: %v", err))
}
// add the chainid to the global config // add the chainid to the global config
config.Set("chain_id", state.ChainID) config.Set("chain_id", state.ChainID)
// Make event switch // Make event switch
eventSwitch := types.NewEventSwitch() eventSwitch := types.NewEventSwitch()
_, err := eventSwitch.Start()
if err != nil {
if _, err := eventSwitch.Start(); err != nil {
Exit(Fmt("Failed to start event switch: %v", err)) Exit(Fmt("Failed to start event switch: %v", err))
} }


+ 9
- 5
proxy/app_conn.go View File

@ -14,7 +14,7 @@ type AppConnConsensus interface {
InitChainSync(validators []*types.Validator) (err error) InitChainSync(validators []*types.Validator) (err error)
BeginBlockSync(height uint64) (err error)
BeginBlockSync(hash []byte, header *types.Header) (err error)
AppendTxAsync(tx []byte) *tmspcli.ReqRes AppendTxAsync(tx []byte) *tmspcli.ReqRes
EndBlockSync(height uint64) (changedValidators []*types.Validator, err error) EndBlockSync(height uint64) (changedValidators []*types.Validator, err error)
CommitSync() (res types.Result) CommitSync() (res types.Result)
@ -34,7 +34,7 @@ type AppConnQuery interface {
Error() error Error() error
EchoSync(string) (res types.Result) EchoSync(string) (res types.Result)
InfoSync() (res types.Result)
InfoSync() (types.Result, *types.TMSPInfo, *types.LastBlockInfo, *types.ConfigInfo)
QuerySync(tx []byte) (res types.Result) QuerySync(tx []byte) (res types.Result)
// SetOptionSync(key string, value string) (res types.Result) // SetOptionSync(key string, value string) (res types.Result)
@ -56,15 +56,19 @@ func NewAppConnConsensus(appConn tmspcli.Client) *appConnConsensus {
func (app *appConnConsensus) SetResponseCallback(cb tmspcli.Callback) { func (app *appConnConsensus) SetResponseCallback(cb tmspcli.Callback) {
app.appConn.SetResponseCallback(cb) app.appConn.SetResponseCallback(cb)
} }
func (app *appConnConsensus) Error() error { func (app *appConnConsensus) Error() error {
return app.appConn.Error() return app.appConn.Error()
} }
func (app *appConnConsensus) InitChainSync(validators []*types.Validator) (err error) { func (app *appConnConsensus) InitChainSync(validators []*types.Validator) (err error) {
return app.appConn.InitChainSync(validators) return app.appConn.InitChainSync(validators)
} }
func (app *appConnConsensus) BeginBlockSync(height uint64) (err error) {
return app.appConn.BeginBlockSync(height)
func (app *appConnConsensus) BeginBlockSync(hash []byte, header *types.Header) (err error) {
return app.appConn.BeginBlockSync(hash, header)
} }
func (app *appConnConsensus) AppendTxAsync(tx []byte) *tmspcli.ReqRes { func (app *appConnConsensus) AppendTxAsync(tx []byte) *tmspcli.ReqRes {
return app.appConn.AppendTxAsync(tx) return app.appConn.AppendTxAsync(tx)
} }
@ -131,7 +135,7 @@ func (app *appConnQuery) EchoSync(msg string) (res types.Result) {
return app.appConn.EchoSync(msg) return app.appConn.EchoSync(msg)
} }
func (app *appConnQuery) InfoSync() (res types.Result) {
func (app *appConnQuery) InfoSync() (types.Result, *types.TMSPInfo, *types.LastBlockInfo, *types.ConfigInfo) {
return app.appConn.InfoSync() return app.appConn.InfoSync()
} }


+ 3
- 3
proxy/app_conn_test.go View File

@ -16,7 +16,7 @@ import (
type AppConnTest interface { type AppConnTest interface {
EchoAsync(string) *tmspcli.ReqRes EchoAsync(string) *tmspcli.ReqRes
FlushSync() error FlushSync() error
InfoSync() (res types.Result)
InfoSync() (types.Result, *types.TMSPInfo, *types.LastBlockInfo, *types.ConfigInfo)
} }
type appConnTest struct { type appConnTest struct {
@ -35,7 +35,7 @@ func (app *appConnTest) FlushSync() error {
return app.appConn.FlushSync() return app.appConn.FlushSync()
} }
func (app *appConnTest) InfoSync() types.Result {
func (app *appConnTest) InfoSync() (types.Result, *types.TMSPInfo, *types.LastBlockInfo, *types.ConfigInfo) {
return app.appConn.InfoSync() return app.appConn.InfoSync()
} }
@ -114,7 +114,7 @@ func TestInfo(t *testing.T) {
proxy := NewAppConnTest(cli) proxy := NewAppConnTest(cli)
t.Log("Connected") t.Log("Connected")
res := proxy.InfoSync()
res, _, _, _ := proxy.InfoSync()
if res.IsErr() { if res.IsErr() {
t.Errorf("Unexpected error: %v", err) t.Errorf("Unexpected error: %v", err)
} }


+ 22
- 14
proxy/multi_app_conn.go View File

@ -5,6 +5,8 @@ import (
cfg "github.com/tendermint/go-config" cfg "github.com/tendermint/go-config"
) )
//-----------------------------
// Tendermint's interface to the application consists of multiple connections // Tendermint's interface to the application consists of multiple connections
type AppConns interface { type AppConns interface {
Service Service
@ -14,19 +16,27 @@ type AppConns interface {
Query() AppConnQuery Query() AppConnQuery
} }
func NewAppConns(config cfg.Config, clientCreator ClientCreator, state State, blockStore BlockStore) AppConns {
return NewMultiAppConn(config, clientCreator, state, blockStore)
func NewAppConns(config cfg.Config, clientCreator ClientCreator, handshaker Handshaker) AppConns {
return NewMultiAppConn(config, clientCreator, handshaker)
}
//-----------------------------
// multiAppConn implements AppConns
type Handshaker interface {
Handshake(AppConns) error
} }
// a multiAppConn is made of a few appConns (mempool, consensus, query) // a multiAppConn is made of a few appConns (mempool, consensus, query)
// and manages their underlying tmsp clients, ensuring they reboot together
// and manages their underlying tmsp clients, including the handshake
// which ensures the app and tendermint are synced.
// TODO: on app restart, clients must reboot together
type multiAppConn struct { type multiAppConn struct {
BaseService BaseService
config cfg.Config config cfg.Config
state State
blockStore BlockStore
handshaker Handshaker
mempoolConn *appConnMempool mempoolConn *appConnMempool
consensusConn *appConnConsensus consensusConn *appConnConsensus
@ -36,11 +46,10 @@ type multiAppConn struct {
} }
// Make all necessary tmsp connections to the application // Make all necessary tmsp connections to the application
func NewMultiAppConn(config cfg.Config, clientCreator ClientCreator, state State, blockStore BlockStore) *multiAppConn {
func NewMultiAppConn(config cfg.Config, clientCreator ClientCreator, handshaker Handshaker) *multiAppConn {
multiAppConn := &multiAppConn{ multiAppConn := &multiAppConn{
config: config, config: config,
state: state,
blockStore: blockStore,
handshaker: handshaker,
clientCreator: clientCreator, clientCreator: clientCreator,
} }
multiAppConn.BaseService = *NewBaseService(log, "multiAppConn", multiAppConn) multiAppConn.BaseService = *NewBaseService(log, "multiAppConn", multiAppConn)
@ -57,6 +66,7 @@ func (app *multiAppConn) Consensus() AppConnConsensus {
return app.consensusConn return app.consensusConn
} }
// Returns the query Connection
func (app *multiAppConn) Query() AppConnQuery { func (app *multiAppConn) Query() AppConnQuery {
return app.queryConn return app.queryConn
} }
@ -85,11 +95,9 @@ func (app *multiAppConn) OnStart() error {
} }
app.consensusConn = NewAppConnConsensus(concli) app.consensusConn = NewAppConnConsensus(concli)
// TODO: handshake
// TODO: replay blocks
// TODO: (on restart) replay mempool
// ensure app is synced to the latest state
if app.handshaker != nil {
return app.handshaker.Handshake(app)
}
return nil return nil
} }

+ 0
- 9
proxy/state.go View File

@ -1,9 +0,0 @@
package proxy
type State interface {
// TODO
}
type BlockStore interface {
// TODO
}

+ 2
- 2
rpc/core/tmsp.go View File

@ -12,6 +12,6 @@ func TMSPQuery(query []byte) (*ctypes.ResultTMSPQuery, error) {
} }
func TMSPInfo() (*ctypes.ResultTMSPInfo, error) { func TMSPInfo() (*ctypes.ResultTMSPInfo, error) {
res := proxyAppQuery.InfoSync()
return &ctypes.ResultTMSPInfo{res}, nil
res, tmspInfo, lastBlockInfo, configInfo := proxyAppQuery.InfoSync()
return &ctypes.ResultTMSPInfo{res, tmspInfo, lastBlockInfo, configInfo}, nil
} }

+ 4
- 1
rpc/core/types/responses.go View File

@ -69,7 +69,10 @@ type ResultUnconfirmedTxs struct {
} }
type ResultTMSPInfo struct { type ResultTMSPInfo struct {
Result tmsp.Result `json:"result"`
Result tmsp.Result `json:"result"`
TMSPInfo *tmsp.TMSPInfo `json:"tmsp_info"`
LastBlockInfo *tmsp.LastBlockInfo `json:"last_block_info"`
ConfigInfo *tmsp.ConfigInfo `json:"config_info"`
} }
type ResultTMSPQuery struct { type ResultTMSPQuery struct {


+ 55
- 0
state/errors.go View File

@ -0,0 +1,55 @@
package state
import (
. "github.com/tendermint/go-common"
)
type (
ErrInvalidBlock error
ErrProxyAppConn error
ErrUnknownBlock struct {
height int
}
ErrBlockHashMismatch struct {
coreHash []byte
appHash []byte
height int
}
ErrAppBlockHeightTooHigh struct {
coreHeight int
appHeight int
}
ErrLastStateMismatch struct {
height int
core []byte
app []byte
}
ErrStateMismatch struct {
got *State
expected *State
}
)
func (e ErrUnknownBlock) Error() string {
return Fmt("Could not find block #%d", e.height)
}
func (e ErrBlockHashMismatch) Error() string {
return Fmt("App block hash (%X) does not match core block hash (%X) for height %d", e.appHash, e.coreHash, e.height)
}
func (e ErrAppBlockHeightTooHigh) Error() string {
return Fmt("App block height (%d) is higher than core (%d)", e.appHeight, e.coreHeight)
}
func (e ErrLastStateMismatch) Error() string {
return Fmt("Latest tendermint block (%d) LastAppHash (%X) does not match app's AppHash (%X)", e.height, e.core, e.app)
}
func (e ErrStateMismatch) Error() string {
return Fmt("State after replay does not match saved state. Got ----\n%v\nExpected ----\n%v\n", e.got, e.expected)
}

+ 297
- 42
state/execution.go View File

@ -1,19 +1,20 @@
package state package state
import ( import (
"bytes"
"errors" "errors"
"fmt"
"github.com/ebuchman/fail-test"
. "github.com/tendermint/go-common" . "github.com/tendermint/go-common"
cfg "github.com/tendermint/go-config"
"github.com/tendermint/tendermint/proxy" "github.com/tendermint/tendermint/proxy"
"github.com/tendermint/tendermint/types" "github.com/tendermint/tendermint/types"
tmsp "github.com/tendermint/tmsp/types" tmsp "github.com/tendermint/tmsp/types"
) )
// Validate block
func (s *State) ValidateBlock(block *types.Block) error {
return s.validateBlock(block)
}
//--------------------------------------------------
// Execute the block
// Execute the block to mutate State. // Execute the block to mutate State.
// Validates block and then executes Data.Txs in the block. // Validates block and then executes Data.Txs in the block.
@ -22,7 +23,7 @@ func (s *State) ExecBlock(eventCache types.Fireable, proxyAppConn proxy.AppConnC
// Validate the block. // Validate the block.
err := s.validateBlock(block) err := s.validateBlock(block)
if err != nil { if err != nil {
return err
return ErrInvalidBlock(err)
} }
// Update the validator set // Update the validator set
@ -37,16 +38,17 @@ func (s *State) ExecBlock(eventCache types.Fireable, proxyAppConn proxy.AppConnC
if err != nil { if err != nil {
// There was some error in proxyApp // There was some error in proxyApp
// TODO Report error and wait for proxyApp to be available. // TODO Report error and wait for proxyApp to be available.
return err
return ErrProxyAppConn(err)
} }
// All good! // All good!
// Update validator accums and set state variables
nextValSet.IncrementAccum(1) nextValSet.IncrementAccum(1)
s.LastBlockHeight = block.Height
s.LastBlockID = types.BlockID{block.Hash(), blockPartsHeader}
s.LastBlockTime = block.Time
s.Validators = nextValSet
s.LastValidators = valSet
s.SetBlockAndValidators(block.Header, blockPartsHeader, valSet, nextValSet)
// save state with updated height/blockhash/validators
// but stale apphash, in case we fail between Commit and Save
s.Save()
return nil return nil
} }
@ -89,26 +91,34 @@ func (s *State) execBlockOnProxyApp(eventCache types.Fireable, proxyAppConn prox
proxyAppConn.SetResponseCallback(proxyCb) proxyAppConn.SetResponseCallback(proxyCb)
// Begin block // Begin block
err := proxyAppConn.BeginBlockSync(uint64(block.Height))
err := proxyAppConn.BeginBlockSync(block.Hash(), types.TM2PB.Header(block.Header))
if err != nil { if err != nil {
log.Warn("Error in proxyAppConn.BeginBlock", "error", err) log.Warn("Error in proxyAppConn.BeginBlock", "error", err)
return err return err
} }
fail.Fail() // XXX
// Run txs of block // Run txs of block
for _, tx := range block.Txs { for _, tx := range block.Txs {
fail.FailRand(len(block.Txs)) // XXX
proxyAppConn.AppendTxAsync(tx) proxyAppConn.AppendTxAsync(tx)
if err := proxyAppConn.Error(); err != nil { if err := proxyAppConn.Error(); err != nil {
return err return err
} }
} }
fail.Fail() // XXX
// End block // End block
changedValidators, err := proxyAppConn.EndBlockSync(uint64(block.Height)) changedValidators, err := proxyAppConn.EndBlockSync(uint64(block.Height))
if err != nil { if err != nil {
log.Warn("Error in proxyAppConn.EndBlock", "error", err) log.Warn("Error in proxyAppConn.EndBlock", "error", err)
return err return err
} }
fail.Fail() // XXX
// TODO: Do something with changedValidators // TODO: Do something with changedValidators
log.Debug("TODO: Do something with changedValidators", "changedValidators", changedValidators) log.Debug("TODO: Do something with changedValidators", "changedValidators", changedValidators)
@ -116,6 +126,41 @@ func (s *State) execBlockOnProxyApp(eventCache types.Fireable, proxyAppConn prox
return nil return nil
} }
// Updates the LastCommitHeight of the validators in valSet, in place.
// Assumes that lastValSet matches the valset of block.LastCommit
// CONTRACT: lastValSet is not mutated.
func updateValidatorsWithBlock(lastValSet *types.ValidatorSet, valSet *types.ValidatorSet, block *types.Block) {
for i, precommit := range block.LastCommit.Precommits {
if precommit == nil {
continue
}
_, val := lastValSet.GetByIndex(i)
if val == nil {
PanicCrisis(Fmt("Failed to fetch validator at index %v", i))
}
if _, val_ := valSet.GetByAddress(val.Address); val_ != nil {
val_.LastCommitHeight = block.Height - 1
updated := valSet.Update(val_)
if !updated {
PanicCrisis("Failed to update validator LastCommitHeight")
}
} else {
// XXX This is not an error if validator was removed.
// But, we don't mutate validators yet so go ahead and panic.
PanicCrisis("Could not find validator")
}
}
}
//-----------------------------------------------------
// Validate block
func (s *State) ValidateBlock(block *types.Block) error {
return s.validateBlock(block)
}
func (s *State) validateBlock(block *types.Block) error { func (s *State) validateBlock(block *types.Block) error {
// Basic block validation. // Basic block validation.
err := block.ValidateBasic(s.ChainID, s.LastBlockHeight, s.LastBlockID, s.LastBlockTime, s.AppHash) err := block.ValidateBasic(s.ChainID, s.LastBlockHeight, s.LastBlockID, s.LastBlockTime, s.AppHash)
@ -130,8 +175,8 @@ func (s *State) validateBlock(block *types.Block) error {
} }
} else { } else {
if len(block.LastCommit.Precommits) != s.LastValidators.Size() { if len(block.LastCommit.Precommits) != s.LastValidators.Size() {
return fmt.Errorf("Invalid block commit size. Expected %v, got %v",
s.LastValidators.Size(), len(block.LastCommit.Precommits))
return errors.New(Fmt("Invalid block commit size. Expected %v, got %v",
s.LastValidators.Size(), len(block.LastCommit.Precommits)))
} }
err := s.LastValidators.VerifyCommit( err := s.LastValidators.VerifyCommit(
s.ChainID, s.LastBlockID, block.Height-1, block.LastCommit) s.ChainID, s.LastBlockID, block.Height-1, block.LastCommit)
@ -143,41 +188,251 @@ func (s *State) validateBlock(block *types.Block) error {
return nil return nil
} }
// Updates the LastCommitHeight of the validators in valSet, in place.
// Assumes that lastValSet matches the valset of block.LastCommit
// CONTRACT: lastValSet is not mutated.
func updateValidatorsWithBlock(lastValSet *types.ValidatorSet, valSet *types.ValidatorSet, block *types.Block) {
//-----------------------------------------------------------------------------
// ApplyBlock executes the block, then commits and updates the mempool atomically
for i, precommit := range block.LastCommit.Precommits {
if precommit == nil {
continue
}
_, val := lastValSet.GetByIndex(i)
if val == nil {
PanicCrisis(Fmt("Failed to fetch validator at index %v", i))
// Execute and commit block against app, save block and state
func (s *State) ApplyBlock(eventCache types.Fireable, proxyAppConn proxy.AppConnConsensus,
block *types.Block, partsHeader types.PartSetHeader, mempool Mempool) error {
// Run the block on the State:
// + update validator sets
// + run txs on the proxyAppConn
err := s.ExecBlock(eventCache, proxyAppConn, block, partsHeader)
if err != nil {
return errors.New(Fmt("Exec failed for application: %v", err))
}
// lock mempool, commit state, update mempoool
err = s.CommitStateUpdateMempool(proxyAppConn, block, mempool)
if err != nil {
return errors.New(Fmt("Commit failed for application: %v", err))
}
return nil
}
// mempool must be locked during commit and update
// because state is typically reset on Commit and old txs must be replayed
// against committed state before new txs are run in the mempool, lest they be invalid
func (s *State) CommitStateUpdateMempool(proxyAppConn proxy.AppConnConsensus, block *types.Block, mempool Mempool) error {
mempool.Lock()
defer mempool.Unlock()
// Commit block, get hash back
res := proxyAppConn.CommitSync()
if res.IsErr() {
log.Warn("Error in proxyAppConn.CommitSync", "error", res)
return res
}
if res.Log != "" {
log.Debug("Commit.Log: " + res.Log)
}
// Set the state's new AppHash
s.AppHash = res.Data
// Update mempool.
mempool.Update(block.Height, block.Txs)
return nil
}
// Updates to the mempool need to be synchronized with committing a block
// so apps can reset their transient state on Commit
type Mempool interface {
Lock()
Unlock()
Update(height int, txs []types.Tx)
}
type mockMempool struct {
}
func (m mockMempool) Lock() {}
func (m mockMempool) Unlock() {}
func (m mockMempool) Update(height int, txs []types.Tx) {}
//----------------------------------------------------------------
// Handshake with app to sync to latest state of core by replaying blocks
// TODO: Should we move blockchain/store.go to its own package?
type BlockStore interface {
Height() int
LoadBlock(height int) *types.Block
}
type Handshaker struct {
config cfg.Config
state *State
store BlockStore
nBlocks int // number of blocks applied to the state
}
func NewHandshaker(config cfg.Config, state *State, store BlockStore) *Handshaker {
return &Handshaker{config, state, store, 0}
}
// TODO: retry the handshake once if it fails the first time
// ... let Info take an argument determining its behaviour
func (h *Handshaker) Handshake(proxyApp proxy.AppConns) error {
// handshake is done via info request on the query conn
res, tmspInfo, blockInfo, configInfo := proxyApp.Query().InfoSync()
if res.IsErr() {
return errors.New(Fmt("Error calling Info. Code: %v; Data: %X; Log: %s", res.Code, res.Data, res.Log))
}
if blockInfo == nil {
log.Warn("blockInfo is nil, aborting handshake")
return nil
}
log.Notice("TMSP Handshake", "height", blockInfo.BlockHeight, "block_hash", blockInfo.BlockHash, "app_hash", blockInfo.AppHash)
blockHeight := int(blockInfo.BlockHeight) // safe, should be an int32
blockHash := blockInfo.BlockHash
appHash := blockInfo.AppHash
if tmspInfo != nil {
// TODO: check tmsp version (or do this in the tmspcli?)
_ = tmspInfo
}
// last block (nil if we starting from 0)
var header *types.Header
var partsHeader types.PartSetHeader
// replay all blocks after blockHeight
// if blockHeight == 0, we will replay everything
if blockHeight != 0 {
block := h.store.LoadBlock(blockHeight)
if block == nil {
return ErrUnknownBlock{blockHeight}
} }
if _, val_ := valSet.GetByAddress(val.Address); val_ != nil {
val_.LastCommitHeight = block.Height - 1
updated := valSet.Update(val_)
if !updated {
PanicCrisis("Failed to update validator LastCommitHeight")
}
} else {
// XXX This is not an error if validator was removed.
// But, we don't mutate validators yet so go ahead and panic.
PanicCrisis("Could not find validator")
// check block hash
if !bytes.Equal(block.Hash(), blockHash) {
return ErrBlockHashMismatch{block.Hash(), blockHash, blockHeight}
} }
// NOTE: app hash should be in the next block ...
// check app hash
/*if !bytes.Equal(block.Header.AppHash, appHash) {
return fmt.Errorf("Handshake error. App hash at height %d does not match. Got %X, expected %X", blockHeight, appHash, block.Header.AppHash)
}*/
header = block.Header
partsHeader = block.MakePartSet(h.config.GetInt("block_part_size")).Header()
}
if configInfo != nil {
// TODO: set config info
_ = configInfo
} }
// replay blocks up to the latest in the blockstore
err := h.ReplayBlocks(appHash, header, partsHeader, proxyApp.Consensus())
if err != nil {
return errors.New(Fmt("Error on replay: %v", err))
}
// TODO: (on restart) replay mempool
return nil
} }
//-----------------------------------------------------------------------------
// Replay all blocks after blockHeight and ensure the result matches the current state.
func (h *Handshaker) ReplayBlocks(appHash []byte, header *types.Header, partsHeader types.PartSetHeader,
appConnConsensus proxy.AppConnConsensus) error {
// NOTE/TODO: tendermint may crash after the app commits
// but before it can save the new state root.
// it should save all eg. valset changes before calling Commit.
// then, if tm state is behind app state, the only thing missing can be app hash
type InvalidTxError struct {
Tx types.Tx
Code tmsp.CodeType
// get a fresh state and reset to the apps latest
stateCopy := h.state.Copy()
// TODO: put validators in iavl tree so we can set the state with an older validator set
lastVals, nextVals := stateCopy.GetValidators()
if header == nil {
stateCopy.LastBlockHeight = 0
stateCopy.LastBlockID = types.BlockID{}
// stateCopy.LastBlockTime = ... doesnt matter
stateCopy.Validators = nextVals
stateCopy.LastValidators = lastVals
} else {
stateCopy.SetBlockAndValidators(header, partsHeader, lastVals, nextVals)
}
stateCopy.Stale = false
stateCopy.AppHash = appHash
appBlockHeight := stateCopy.LastBlockHeight
coreBlockHeight := h.store.Height()
if coreBlockHeight < appBlockHeight {
// if the app is ahead, there's nothing we can do
return ErrAppBlockHeightTooHigh{coreBlockHeight, appBlockHeight}
} else if coreBlockHeight == appBlockHeight {
// if we crashed between Commit and SaveState,
// the state's app hash is stale.
// otherwise we're synced
if h.state.Stale {
h.state.Stale = false
h.state.AppHash = appHash
}
return checkState(h.state, stateCopy)
} else if h.state.LastBlockHeight == appBlockHeight {
// core is ahead of app but core's state height is at apps height
// this happens if we crashed after saving the block,
// but before committing it. We should be 1 ahead
if coreBlockHeight != appBlockHeight+1 {
PanicSanity(Fmt("core.state.height == app.height but core.height (%d) > app.height+1 (%d)", coreBlockHeight, appBlockHeight+1))
}
// check that the blocks last apphash is the states apphash
block := h.store.LoadBlock(coreBlockHeight)
if !bytes.Equal(block.Header.AppHash, appHash) {
return ErrLastStateMismatch{coreBlockHeight, block.Header.AppHash, appHash}
}
// replay the block against the actual tendermint state (not the copy)
return h.loadApplyBlock(coreBlockHeight, h.state, appConnConsensus)
} else {
// either we're caught up or there's blocks to replay
// replay all blocks starting with appBlockHeight+1
for i := appBlockHeight + 1; i <= coreBlockHeight; i++ {
h.loadApplyBlock(i, stateCopy, appConnConsensus)
}
return checkState(h.state, stateCopy)
}
} }
func (txErr InvalidTxError) Error() string {
return Fmt("Invalid tx: [%v] code: [%v]", txErr.Tx, txErr.Code)
func checkState(s, stateCopy *State) error {
// The computed state and the previously set state should be identical
if !s.Equals(stateCopy) {
return ErrStateMismatch{stateCopy, s}
}
return nil
}
func (h *Handshaker) loadApplyBlock(blockIndex int, state *State, appConnConsensus proxy.AppConnConsensus) error {
h.nBlocks += 1
block := h.store.LoadBlock(blockIndex)
panicOnNilBlock(blockIndex, h.store.Height(), block) // XXX
var eventCache types.Fireable // nil
return state.ApplyBlock(eventCache, appConnConsensus, block, block.MakePartSet(h.config.GetInt("block_part_size")).Header(), mockMempool{})
}
func panicOnNilBlock(height, bsHeight int, block *types.Block) {
if block == nil {
// Sanity?
PanicCrisis(Fmt(`
block is nil for height <= blockStore.Height() (%d <= %d).
Block: %v,
`, height, bsHeight, block))
}
} }

+ 193
- 0
state/execution_test.go View File

@ -0,0 +1,193 @@
package state
import (
"bytes"
"fmt"
"path"
"testing"
"github.com/tendermint/tendermint/config/tendermint_test"
// . "github.com/tendermint/go-common"
"github.com/tendermint/go-crypto"
dbm "github.com/tendermint/go-db"
"github.com/tendermint/tendermint/proxy"
"github.com/tendermint/tendermint/types"
"github.com/tendermint/tmsp/example/dummy"
)
var (
privKey = crypto.GenPrivKeyEd25519FromSecret([]byte("handshake_test"))
chainID = "handshake_chain"
nBlocks = 5
mempool = mockMempool{}
testPartSize = 65536
)
func TestExecBlock(t *testing.T) {
// TODO
}
// Sync from scratch
func TestHandshakeReplayAll(t *testing.T) {
testHandshakeReplay(t, 0)
}
// Sync many, not from scratch
func TestHandshakeReplaySome(t *testing.T) {
testHandshakeReplay(t, 1)
}
// Sync from lagging by one
func TestHandshakeReplayOne(t *testing.T) {
testHandshakeReplay(t, nBlocks-1)
}
// Sync from caught up
func TestHandshakeReplayNone(t *testing.T) {
testHandshakeReplay(t, nBlocks)
}
// Make some blocks. Start a fresh app and apply n blocks. Then restart the app and sync it up with the remaining blocks
func testHandshakeReplay(t *testing.T, n int) {
config := tendermint_test.ResetConfig("proxy_test_")
state, store := stateAndStore()
clientCreator := proxy.NewLocalClientCreator(dummy.NewPersistentDummyApplication(path.Join(config.GetString("db_dir"), "1")))
clientCreator2 := proxy.NewLocalClientCreator(dummy.NewPersistentDummyApplication(path.Join(config.GetString("db_dir"), "2")))
proxyApp := proxy.NewAppConns(config, clientCreator, NewHandshaker(config, state, store))
if _, err := proxyApp.Start(); err != nil {
t.Fatalf("Error starting proxy app connections: %v", err)
}
chain := makeBlockchain(t, proxyApp, state)
store.chain = chain //
latestAppHash := state.AppHash
proxyApp.Stop()
if n > 0 {
// start a new app without handshake, play n blocks
proxyApp = proxy.NewAppConns(config, clientCreator2, nil)
if _, err := proxyApp.Start(); err != nil {
t.Fatalf("Error starting proxy app connections: %v", err)
}
state2, _ := stateAndStore()
for i := 0; i < n; i++ {
block := chain[i]
err := state2.ApplyBlock(nil, proxyApp.Consensus(), block, block.MakePartSet(testPartSize).Header(), mempool)
if err != nil {
t.Fatal(err)
}
}
proxyApp.Stop()
}
// now start it with the handshake
handshaker := NewHandshaker(config, state, store)
proxyApp = proxy.NewAppConns(config, clientCreator2, handshaker)
if _, err := proxyApp.Start(); err != nil {
t.Fatalf("Error starting proxy app connections: %v", err)
}
// get the latest app hash from the app
r, _, blockInfo, _ := proxyApp.Query().InfoSync()
if r.IsErr() {
t.Fatal(r)
}
// the app hash should be synced up
if !bytes.Equal(latestAppHash, blockInfo.AppHash) {
t.Fatalf("Expected app hashes to match after handshake/replay. got %X, expected %X", blockInfo.AppHash, latestAppHash)
}
if handshaker.nBlocks != nBlocks-n {
t.Fatalf("Expected handshake to sync %d blocks, got %d", nBlocks-n, handshaker.nBlocks)
}
}
//--------------------------
// make some bogus txs
func txsFunc(blockNum int) (txs []types.Tx) {
for i := 0; i < 10; i++ {
txs = append(txs, types.Tx([]byte{byte(blockNum), byte(i)}))
}
return txs
}
// sign a commit vote
func signCommit(height, round int, hash []byte, header types.PartSetHeader) *types.Vote {
vote := &types.Vote{
ValidatorIndex: 0,
ValidatorAddress: privKey.PubKey().Address(),
Height: height,
Round: round,
Type: types.VoteTypePrecommit,
BlockID: types.BlockID{hash, header},
}
sig := privKey.Sign(types.SignBytes(chainID, vote))
vote.Signature = sig.(crypto.SignatureEd25519)
return vote
}
// make a blockchain with one validator
func makeBlockchain(t *testing.T, proxyApp proxy.AppConns, state *State) (blockchain []*types.Block) {
prevHash := state.LastBlockID.Hash
lastCommit := new(types.Commit)
prevParts := types.PartSetHeader{}
valHash := state.Validators.Hash()
prevBlockID := types.BlockID{prevHash, prevParts}
for i := 1; i < nBlocks+1; i++ {
block, parts := types.MakeBlock(i, chainID, txsFunc(i), lastCommit,
prevBlockID, valHash, state.AppHash, testPartSize)
fmt.Println(i)
fmt.Println(prevBlockID)
fmt.Println(block.LastBlockID)
err := state.ApplyBlock(nil, proxyApp.Consensus(), block, block.MakePartSet(testPartSize).Header(), mempool)
if err != nil {
t.Fatal(i, err)
}
voteSet := types.NewVoteSet(chainID, i, 0, types.VoteTypePrecommit, state.Validators)
vote := signCommit(i, 0, block.Hash(), parts.Header())
_, err = voteSet.AddVote(vote)
if err != nil {
t.Fatal(err)
}
blockchain = append(blockchain, block)
prevHash = block.Hash()
prevParts = parts.Header()
lastCommit = voteSet.MakeCommit()
prevBlockID = types.BlockID{prevHash, prevParts}
}
return blockchain
}
// fresh state and mock store
func stateAndStore() (*State, *mockBlockStore) {
stateDB := dbm.NewMemDB()
return MakeGenesisState(stateDB, &types.GenesisDoc{
ChainID: chainID,
Validators: []types.GenesisValidator{
types.GenesisValidator{privKey.PubKey(), 10000, "test"},
},
AppHash: nil,
}), NewMockBlockStore(nil)
}
//----------------------------------
// mock block store
type mockBlockStore struct {
chain []*types.Block
}
func NewMockBlockStore(chain []*types.Block) *mockBlockStore {
return &mockBlockStore{chain}
}
func (bs *mockBlockStore) Height() int { return len(bs.chain) }
func (bs *mockBlockStore) LoadBlock(height int) *types.Block { return bs.chain[height-1] }

+ 51
- 6
state/state.go View File

@ -7,6 +7,7 @@ import (
"time" "time"
. "github.com/tendermint/go-common" . "github.com/tendermint/go-common"
cfg "github.com/tendermint/go-config"
dbm "github.com/tendermint/go-db" dbm "github.com/tendermint/go-db"
"github.com/tendermint/go-wire" "github.com/tendermint/go-wire"
"github.com/tendermint/tendermint/types" "github.com/tendermint/tendermint/types"
@ -20,16 +21,25 @@ var (
// NOTE: not goroutine-safe. // NOTE: not goroutine-safe.
type State struct { type State struct {
mtx sync.Mutex
db dbm.DB
GenesisDoc *types.GenesisDoc
ChainID string
// mtx for writing to db
mtx sync.Mutex
db dbm.DB
// should not change
GenesisDoc *types.GenesisDoc
ChainID string
// updated at end of ExecBlock
LastBlockHeight int // Genesis state has this set to 0. So, Block(H=0) does not exist. LastBlockHeight int // Genesis state has this set to 0. So, Block(H=0) does not exist.
LastBlockID types.BlockID LastBlockID types.BlockID
LastBlockTime time.Time LastBlockTime time.Time
Validators *types.ValidatorSet Validators *types.ValidatorSet
LastValidators *types.ValidatorSet LastValidators *types.ValidatorSet
AppHash []byte
// AppHash is updated after Commit;
// it's stale after ExecBlock and before Commit
Stale bool
AppHash []byte
} }
func LoadState(db dbm.DB) *State { func LoadState(db dbm.DB) *State {
@ -59,6 +69,7 @@ func (s *State) Copy() *State {
LastBlockTime: s.LastBlockTime, LastBlockTime: s.LastBlockTime,
Validators: s.Validators.Copy(), Validators: s.Validators.Copy(),
LastValidators: s.LastValidators.Copy(), LastValidators: s.LastValidators.Copy(),
Stale: s.Stale, // XXX: but really state shouldnt be copied while its stale
AppHash: s.AppHash, AppHash: s.AppHash,
} }
} }
@ -66,13 +77,47 @@ func (s *State) Copy() *State {
func (s *State) Save() { func (s *State) Save() {
s.mtx.Lock() s.mtx.Lock()
defer s.mtx.Unlock() defer s.mtx.Unlock()
s.db.Set(stateKey, s.Bytes())
}
func (s *State) Equals(s2 *State) bool {
return bytes.Equal(s.Bytes(), s2.Bytes())
}
func (s *State) Bytes() []byte {
buf, n, err := new(bytes.Buffer), new(int), new(error) buf, n, err := new(bytes.Buffer), new(int), new(error)
wire.WriteBinary(s, buf, n, err) wire.WriteBinary(s, buf, n, err)
if *err != nil { if *err != nil {
PanicCrisis(*err) PanicCrisis(*err)
} }
s.db.Set(stateKey, buf.Bytes())
return buf.Bytes()
}
// Mutate state variables to match block and validators
// Since we don't have the new AppHash yet, we set s.Stale=true
func (s *State) SetBlockAndValidators(header *types.Header, blockPartsHeader types.PartSetHeader, prevValSet, nextValSet *types.ValidatorSet) {
s.LastBlockHeight = header.Height
s.LastBlockID = types.BlockID{header.Hash(), blockPartsHeader}
s.LastBlockTime = header.Time
s.Validators = nextValSet
s.LastValidators = prevValSet
s.Stale = true
}
func (s *State) GetValidators() (*types.ValidatorSet, *types.ValidatorSet) {
return s.LastValidators, s.Validators
}
// Load the most recent state from "state" db,
// or create a new one (and save) from genesis.
func GetState(config cfg.Config, stateDB dbm.DB) *State {
state := LoadState(stateDB)
if state == nil {
state = MakeGenesisStateFromFile(stateDB, config.GetString("genesis_file"))
state.Save()
}
return state
} }
//----------------------------------------------------------------------------- //-----------------------------------------------------------------------------


+ 42
- 0
state/state_test.go View File

@ -0,0 +1,42 @@
package state
import (
"testing"
dbm "github.com/tendermint/go-db"
"github.com/tendermint/tendermint/config/tendermint_test"
)
func TestStateCopyEquals(t *testing.T) {
config := tendermint_test.ResetConfig("state_")
// Get State db
stateDB := dbm.NewDB("state", config.GetString("db_backend"), config.GetString("db_dir"))
state := GetState(config, stateDB)
stateCopy := state.Copy()
if !state.Equals(stateCopy) {
t.Fatal("expected state and its copy to be identical. got %v\n expected %v\n", stateCopy, state)
}
stateCopy.LastBlockHeight += 1
if state.Equals(stateCopy) {
t.Fatal("expected states to be different. got same %v", state)
}
}
func TestStateSaveLoad(t *testing.T) {
config := tendermint_test.ResetConfig("state_")
// Get State db
stateDB := dbm.NewDB("state", config.GetString("db_backend"), config.GetString("db_dir"))
state := GetState(config, stateDB)
state.LastBlockHeight += 1
state.Save()
loadedState := LoadState(stateDB)
if !state.Equals(loadedState) {
t.Fatal("expected state and its copy to be identical. got %v\n expected %v\n", loadedState, state)
}
}

+ 68
- 0
test/persist/test.sh View File

@ -0,0 +1,68 @@
#! /bin/bash
export TMROOT=$HOME/.tendermint_persist
rm -rf $TMROOT
tendermint init
function start_procs(){
name=$1
echo "Starting persistent dummy and tendermint"
dummy --persist $TMROOT/dummy &> "dummy_${name}.log" &
PID_DUMMY=$!
tendermint node &> tendermint_${name}.log &
PID_TENDERMINT=$!
sleep 5
}
function kill_procs(){
kill -9 $PID_DUMMY $PID_TENDERMINT
}
function send_txs(){
# send a bunch of txs over a few blocks
echo "Sending txs"
for i in `seq 1 5`; do
for j in `seq 1 100`; do
tx=`head -c 8 /dev/urandom | hexdump -ve '1/1 "%.2X"'`
curl -s 127.0.0.1:46657/broadcast_tx_async?tx=\"$tx\" &> /dev/null
done
sleep 1
done
}
start_procs 1
send_txs
kill_procs
start_procs 2
# wait for node to handshake and make a new block
addr="localhost:46657"
curl -s $addr/status > /dev/null
ERR=$?
i=0
while [ "$ERR" != 0 ]; do
sleep 1
curl -s $addr/status > /dev/null
ERR=$?
i=$(($i + 1))
if [[ $i == 10 ]]; then
echo "Timed out waiting for tendermint to start"
exit 1
fi
done
# wait for a new block
h1=`curl -s $addr/status | jq .result[1].latest_block_height`
h2=$h1
while [ "$h2" == "$h1" ]; do
sleep 1
h2=`curl -s $addr/status | jq .result[1].latest_block_height`
done
kill_procs
echo "Passed Test: Persistence"

+ 104
- 0
test/persist/test2.sh View File

@ -0,0 +1,104 @@
#! /bin/bash
export TMROOT=$HOME/.tendermint_persist
rm -rf $TMROOT
tendermint init
function start_procs(){
name=$1
indexToFail=$2
echo "Starting persistent dummy and tendermint"
dummy --persist $TMROOT/dummy &> "dummy_${name}.log" &
PID_DUMMY=$!
if [[ "$indexToFail" == "" ]]; then
# run in background, dont fail
tendermint node &> tendermint_${name}.log &
PID_TENDERMINT=$!
else
# run in foreground, fail
FAIL_TEST_INDEX=$indexToFail tendermint node &> tendermint_${name}.log
PID_TENDERMINT=$!
fi
}
function kill_procs(){
kill -9 $PID_DUMMY $PID_TENDERMINT
wait $PID_DUMMY
wait $PID_TENDERMINT
}
# wait till node is up, send txs
function send_txs(){
addr="127.0.0.1:46657"
curl -s $addr/status > /dev/null
ERR=$?
while [ "$ERR" != 0 ]; do
sleep 1
curl -s $addr/status > /dev/null
ERR=$?
done
# send a bunch of txs over a few blocks
echo "Node is up, sending txs"
for i in `seq 1 5`; do
for j in `seq 1 100`; do
tx=`head -c 8 /dev/urandom | hexdump -ve '1/1 "%.2X"'`
curl -s $addr/broadcast_tx_async?tx=\"$tx\" &> /dev/null
done
sleep 1
done
}
failsStart=0
fails=`grep -r "fail.Fail" --include \*.go . | wc -l`
failsEnd=$(($fails-1))
for failIndex in `seq $failsStart $failsEnd`; do
echo ""
echo "* Test FailIndex $failIndex"
# test failure at failIndex
send_txs &
start_procs 1 $failIndex
# tendermint should fail when it hits the fail index
kill -9 $PID_DUMMY
wait $PID_DUMMY
start_procs 2
# wait for node to handshake and make a new block
addr="localhost:46657"
curl -s $addr/status > /dev/null
ERR=$?
i=0
while [ "$ERR" != 0 ]; do
sleep 1
curl -s $addr/status > /dev/null
ERR=$?
i=$(($i + 1))
if [[ $i == 10 ]]; then
echo "Timed out waiting for tendermint to start"
exit 1
fi
done
# wait for a new block
h1=`curl -s $addr/status | jq .result[1].latest_block_height`
h2=$h1
while [ "$h2" == "$h1" ]; do
sleep 1
h2=`curl -s $addr/status | jq .result[1].latest_block_height`
done
kill_procs
echo "* Passed Test for FailIndex $failIndex"
echo ""
done
echo "Passed Test: Persistence"

+ 3
- 0
test/run_test.sh View File

@ -11,6 +11,9 @@ bash test/test_cover.sh
# run the app tests # run the app tests
bash test/app/test.sh bash test/app/test.sh
# run the persistence test
bash test/persist/test.sh
if [[ "$BRANCH" == "master" || $(echo "$BRANCH" | grep "release-") != "" ]]; then if [[ "$BRANCH" == "master" || $(echo "$BRANCH" | grep "release-") != "" ]]; then
echo "" echo ""
echo "* branch $BRANCH; testing libs" echo "* branch $BRANCH; testing libs"


+ 21
- 0
types/block.go View File

@ -21,6 +21,27 @@ type Block struct {
LastCommit *Commit `json:"last_commit"` LastCommit *Commit `json:"last_commit"`
} }
func MakeBlock(height int, chainID string, txs []Tx, commit *Commit,
prevBlockID BlockID, valHash, appHash []byte, partSize int) (*Block, *PartSet) {
block := &Block{
Header: &Header{
ChainID: chainID,
Height: height,
Time: time.Now(),
NumTxs: len(txs),
LastBlockID: prevBlockID,
ValidatorsHash: valHash,
AppHash: appHash, // state merkle root of txs from the previous block.
},
LastCommit: commit,
Data: &Data{
Txs: txs,
},
}
block.FillHeader()
return block, block.MakePartSet(partSize)
}
// Basic validation that doesn't involve state data. // Basic validation that doesn't involve state data.
func (b *Block) ValidateBasic(chainID string, lastBlockHeight int, lastBlockID BlockID, func (b *Block) ValidateBasic(chainID string, lastBlockHeight int, lastBlockID BlockID,
lastBlockTime time.Time, appHash []byte) error { lastBlockTime time.Time, appHash []byte) error {


+ 3
- 1
types/events.go View File

@ -130,7 +130,9 @@ func NewEventCache(evsw EventSwitch) EventCache {
// All events should be based on this FireEvent to ensure they are TMEventData // All events should be based on this FireEvent to ensure they are TMEventData
func fireEvent(fireable events.Fireable, event string, data TMEventData) { func fireEvent(fireable events.Fireable, event string, data TMEventData) {
fireable.FireEvent(event, data)
if fireable != nil {
fireable.FireEvent(event, data)
}
} }
func AddListenerForEvent(evsw EventSwitch, id, event string, cb func(data TMEventData)) { func AddListenerForEvent(evsw EventSwitch, id, event string, cb func(data TMEventData)) {


+ 44
- 0
types/protobuf.go View File

@ -0,0 +1,44 @@
package types
import (
"github.com/tendermint/tmsp/types"
)
// Convert tendermint types to protobuf types
var TM2PB = tm2pb{}
type tm2pb struct{}
func (tm2pb) Header(header *Header) *types.Header {
return &types.Header{
ChainId: header.ChainID,
Height: int32(header.Height),
Time: uint64(header.Time.Unix()),
NumTxs: uint64(header.NumTxs),
LastBlockId: TM2PB.BlockID(header.LastBlockID),
LastCommitHash: header.LastCommitHash,
DataHash: header.DataHash,
AppHash: header.AppHash,
}
}
func (tm2pb) BlockID(blockID BlockID) *types.BlockID {
return &types.BlockID{
Hash: blockID.Hash,
Parts: TM2PB.PartSetHeader(blockID.PartsHeader),
}
}
func (tm2pb) PartSetHeader(partSetHeader PartSetHeader) *types.PartSetHeader {
return &types.PartSetHeader{
Total: uint64(partSetHeader.Total),
Hash: partSetHeader.Hash,
}
}
func (tm2pb) Validator(val *Validator) *types.Validator {
return &types.Validator{
PubKey: val.PubKey.Bytes(),
Power: uint64(val.VotingPower),
}
}

+ 3
- 2
types/validator.go View File

@ -11,8 +11,9 @@ import (
) )
// Volatile state for each Validator // Volatile state for each Validator
// Also persisted with the state, but fields change
// every height|round so they don't go in merkle.Tree
// TODO: make non-volatile identity
// - Remove LastCommitHeight, send bitarray of vals that signed in BeginBlock
// - Remove Accum - it can be computed, and now valset becomes identifying
type Validator struct { type Validator struct {
Address []byte `json:"address"` Address []byte `json:"address"`
PubKey crypto.PubKey `json:"pub_key"` PubKey crypto.PubKey `json:"pub_key"`


+ 1
- 1
types/validator_set.go View File

@ -20,7 +20,7 @@ import (
// NOTE: Not goroutine-safe. // NOTE: Not goroutine-safe.
// NOTE: All get/set to validators should copy the value for safety. // NOTE: All get/set to validators should copy the value for safety.
// TODO: consider validator Accum overflow // TODO: consider validator Accum overflow
// TODO: replace validators []*Validator with github.com/jaekwon/go-ibbs?
// TODO: move valset into an iavl tree where key is 'blockbonded|pubkey'
type ValidatorSet struct { type ValidatorSet struct {
Validators []*Validator // NOTE: persisted via reflect, must be exported. Validators []*Validator // NOTE: persisted via reflect, must be exported.


Loading…
Cancel
Save