diff --git a/node/node.go b/node/node.go index 45e7ae001..b34d0e037 100644 --- a/node/node.go +++ b/node/node.go @@ -62,8 +62,7 @@ func NewNode(config cfg.Config, privValidator *types.PrivValidator, clientCreato // Get State state := getState(config, stateDB) - // Create the proxyApp, which houses three connections: - // query, consensus, and mempool + // Create the proxyApp, which manages connections (consensus, mempool, query) proxyApp := proxy.NewAppConns(config, clientCreator, state, blockStore) if _, err := proxyApp.Start(); err != nil { Exit(Fmt("Error starting proxy app connections: %v", err)) @@ -391,9 +390,12 @@ func newConsensusState(config cfg.Config) *consensus.ConsensusState { stateDB := dbm.NewDB("state", config.GetString("db_backend"), config.GetString("db_dir")) state := sm.MakeGenesisStateFromFile(stateDB, config.GetString("genesis_file")) - // Create two proxyAppConn connections, - // one for the consensus and one for the mempool. + // Create proxyAppConn connection (consensus, mempool, query) proxyApp := proxy.NewAppConns(config, proxy.DefaultClientCreator(config), state, blockStore) + _, err := proxyApp.Start() + if err != nil { + Exit(Fmt("Error starting proxy app conns: %v", err)) + } // add the chainid to the global config config.Set("chain_id", state.ChainID) diff --git a/proxy/app_conn.go b/proxy/app_conn.go index e19e6f09f..382bb3b83 100644 --- a/proxy/app_conn.go +++ b/proxy/app_conn.go @@ -56,15 +56,19 @@ func NewAppConnConsensus(appConn tmspcli.Client) *appConnConsensus { func (app *appConnConsensus) SetResponseCallback(cb tmspcli.Callback) { app.appConn.SetResponseCallback(cb) } + func (app *appConnConsensus) Error() error { return app.appConn.Error() } + func (app *appConnConsensus) InitChainSync(validators []*types.Validator) (err error) { return app.appConn.InitChainSync(validators) } + func (app *appConnConsensus) BeginBlockSync(header *types.Header) (err error) { return app.appConn.BeginBlockSync(header) } + func (app *appConnConsensus) AppendTxAsync(tx []byte) *tmspcli.ReqRes { return app.appConn.AppendTxAsync(tx) } diff --git a/proxy/multi_app_conn.go b/proxy/multi_app_conn.go index 8e4c84aa2..cfd827853 100644 --- a/proxy/multi_app_conn.go +++ b/proxy/multi_app_conn.go @@ -1,10 +1,20 @@ package proxy import ( + "bytes" + "fmt" + "sync" + . "github.com/tendermint/go-common" cfg "github.com/tendermint/go-config" + "github.com/tendermint/tendermint/types" // ... + tmspcli "github.com/tendermint/tmsp/client" + "github.com/tendermint/tmsp/example/dummy" + nilapp "github.com/tendermint/tmsp/example/nil" ) +//----------------------------- + // Tendermint's interface to the application consists of multiple connections type AppConns interface { Service @@ -19,7 +29,9 @@ func NewAppConns(config cfg.Config, clientCreator ClientCreator, state State, bl } // a multiAppConn is made of a few appConns (mempool, consensus, query) -// and manages their underlying tmsp clients, ensuring they reboot together +// and manages their underlying tmsp clients, including the handshake +// which ensures the app and tendermint are synced. +// TODO: on app restart, clients must reboot together type multiAppConn struct { BaseService @@ -57,6 +69,7 @@ func (app *multiAppConn) Consensus() AppConnConsensus { return app.consensusConn } +// Returns the query Connection func (app *multiAppConn) Query() AppConnQuery { return app.queryConn } @@ -85,11 +98,102 @@ func (app *multiAppConn) OnStart() error { } app.consensusConn = NewAppConnConsensus(concli) - // TODO: handshake + // ensure app is synced to the latest state + return app.Handshake() +} + +// TODO: retry the handshake once if it fails the first time +func (app *multiAppConn) Handshake() error { + // handshake is done on the query conn + res, tmspInfo, blockInfo, configInfo := app.queryConn.InfoSync() + if res.IsErr() { + return fmt.Errorf("Error calling Info. Code: %v; Data: %X; Log: %s", res.Code, res.Data, res.Log) + } + + if blockInfo == nil { + log.Warn("blockInfo is nil, aborting handshake") + return nil + } + + log.Notice("TMSP Handshake", "height", blockInfo.BlockHeight, "block_hash", blockInfo.BlockHash, "app_hash", blockInfo.AppHash) + + // TODO: check overflow or change pb to int32 + blockHeight := int(blockInfo.BlockHeight) + blockHash := blockInfo.BlockHash + appHash := blockInfo.AppHash + + if tmspInfo != nil { + // TODO: check tmsp version (or do this in the tmspcli?) + _ = tmspInfo + } + + // of the last block (nil if we starting from 0) + var header *types.Header + var partsHeader types.PartSetHeader + + // check block + // if the blockHeight == 0, we will replay everything + if blockHeight != 0 { + blockMeta := app.blockStore.LoadBlockMeta(blockHeight) + if blockMeta == nil { + return fmt.Errorf("Handshake error. Could not find block #%d", blockHeight) + } + + // check block hash + if !bytes.Equal(blockMeta.Hash, blockHash) { + return fmt.Errorf("Handshake error. Block hash at height %d does not match. Got %X, expected %X", blockHeight, blockHash, blockMeta.Hash) + } + + // check app hash + if !bytes.Equal(blockMeta.Header.AppHash, appHash) { + return fmt.Errorf("Handshake error. App hash at height %d does not match. Got %X, expected %X", blockHeight, appHash, blockMeta.Header.AppHash) + } + + header = blockMeta.Header + partsHeader = blockMeta.PartsHeader + } + + if configInfo != nil { + // TODO: set config info + _ = configInfo + } - // TODO: replay blocks + // replay blocks up to the latest in the blockstore + err := app.state.ReplayBlocks(header, partsHeader, app.consensusConn, app.blockStore) + if err != nil { + return fmt.Errorf("Error on replay: %v", err) + } // TODO: (on restart) replay mempool return nil } + +//-------------------------------- + +// Get a connected tmsp client +func NewTMSPClient(addr, transport string) (tmspcli.Client, error) { + var client tmspcli.Client + + // use local app (for testing) + // TODO: local proxy app conn + switch addr { + case "nilapp": + app := nilapp.NewNilApplication() + mtx := new(sync.Mutex) // TODO + client = tmspcli.NewLocalClient(mtx, app) + case "dummy": + app := dummy.NewDummyApplication() + mtx := new(sync.Mutex) // TODO + client = tmspcli.NewLocalClient(mtx, app) + default: + // Run forever in a loop + mustConnect := false + remoteApp, err := tmspcli.NewClient(addr, transport, mustConnect) + if err != nil { + return nil, fmt.Errorf("Failed to connect to proxy for mempool: %v", err) + } + client = remoteApp + } + return client, nil +} diff --git a/proxy/state.go b/proxy/state.go index a52dd0f8a..8091d49b7 100644 --- a/proxy/state.go +++ b/proxy/state.go @@ -1,9 +1,15 @@ package proxy +import ( + "github.com/tendermint/tendermint/types" +) + type State interface { - // TODO + ReplayBlocks(*types.Header, types.PartSetHeader, AppConnConsensus, BlockStore) error } type BlockStore interface { - // TODO + Height() int + LoadBlockMeta(height int) *types.BlockMeta + LoadBlock(height int) *types.Block } diff --git a/state/execution.go b/state/execution.go index e5653e7e7..f5dcadae9 100644 --- a/state/execution.go +++ b/state/execution.go @@ -41,12 +41,9 @@ func (s *State) ExecBlock(eventCache types.Fireable, proxyAppConn proxy.AppConnC } // All good! + // Update validator accums and set state variables nextValSet.IncrementAccum(1) - s.LastBlockHeight = block.Height - s.LastBlockID = types.BlockID{block.Hash(), blockPartsHeader} - s.LastBlockTime = block.Time - s.Validators = nextValSet - s.LastValidators = valSet + s.SetBlockAndValidators(block.Header, blockPartsHeader, valSet, nextValSet) return nil } @@ -89,7 +86,7 @@ func (s *State) execBlockOnProxyApp(eventCache types.Fireable, proxyAppConn prox proxyAppConn.SetResponseCallback(proxyCb) // Begin block - err := proxyAppConn.BeginBlockSync(uint64(block.Height)) + err := proxyAppConn.BeginBlockSync(types.TM2PB.Header(block.Header)) if err != nil { log.Warn("Error in proxyAppConn.BeginBlock", "error", err) return err @@ -181,3 +178,60 @@ type InvalidTxError struct { func (txErr InvalidTxError) Error() string { return Fmt("Invalid tx: [%v] code: [%v]", txErr.Tx, txErr.Code) } + +//----------------------------------------------------------------------------- + +// Replay all blocks after blockHeight and ensure the result matches the current state +func (s *State) ReplayBlocks(header *types.Header, partsHeader types.PartSetHeader, + appConnConsensus proxy.AppConnConsensus, blockStore proxy.BlockStore) error { + + // fresh state to work on + stateCopy := s.Copy() + + // reset to this height (do nothing if its 0) + var blockHeight int + if header != nil { + blockHeight = header.Height + // TODO: put validators in iavl tree so we can set the state with an older validator set + lastVals, nextVals := stateCopy.GetValidators() + stateCopy.SetBlockAndValidators(header, partsHeader, lastVals, nextVals) + } + + // replay all blocks starting with blockHeight+1 + for i := blockHeight + 1; i <= blockStore.Height(); i++ { + blockMeta := blockStore.LoadBlockMeta(i) + if blockMeta == nil { + PanicSanity(Fmt("Nil blockMeta at height %d when blockStore height is %d", i, blockStore.Height())) + } + + block := blockStore.LoadBlock(i) + if block == nil { + PanicSanity(Fmt("Nil block at height %d when blockStore height is %d", i, blockStore.Height())) + } + + // run the transactions + var eventCache events.Fireable // nil + err := stateCopy.ExecBlock(eventCache, appConnConsensus, block, blockMeta.PartsHeader) + if err != nil { + return fmt.Errorf("Error on ExecBlock: %v", err) + } + + // commit the block (app should save the state) + res := appConnConsensus.CommitSync() + if res.IsErr() { + return fmt.Errorf("Error on Commit: %v", res) + } + if res.Log != "" { + log.Debug("Commit.Log: " + res.Log) + } + + // update the state hash + stateCopy.AppHash = res.Data + } + + // The computed state and the previously set state should be identical + if !s.Equals(stateCopy) { + return fmt.Errorf("State after replay does not match saved state. Got ----\n%v\nExpected ----\n%v\n", stateCopy, s) + } + return nil +} diff --git a/state/state.go b/state/state.go index 213484863..699f612e2 100644 --- a/state/state.go +++ b/state/state.go @@ -66,13 +66,33 @@ func (s *State) Copy() *State { func (s *State) Save() { s.mtx.Lock() defer s.mtx.Unlock() + s.db.Set(stateKey, s.Bytes()) +} + +func (s *State) Equals(s2 *State) bool { + return bytes.Equal(s.Bytes(), s2.Bytes()) +} +func (s *State) Bytes() []byte { buf, n, err := new(bytes.Buffer), new(int), new(error) wire.WriteBinary(s, buf, n, err) if *err != nil { PanicCrisis(*err) } - s.db.Set(stateKey, buf.Bytes()) + return buf.Bytes() +} + +// Mutate state variables to match block and validators +func (s *State) SetBlockAndValidators(header *types.Header, blockPartsHeader types.PartSetHeader, prevValSet, nextValSet *types.ValidatorSet) { + s.LastBlockHeight = header.Height + s.LastBlockID = types.BlockID{block.Hash(), blockPartsHeader} + s.LastBlockTime = header.Time + s.Validators = nextValSet + s.LastValidators = prevValSet +} + +func (s *State) GetValidators() (*types.ValidatorSet, *types.ValidatorSet) { + return s.LastValidators, s.Validators } //-----------------------------------------------------------------------------