Browse Source

handshake

pull/265/head
Ethan Buchman 8 years ago
parent
commit
a0e4253edc
6 changed files with 206 additions and 16 deletions
  1. +6
    -4
      node/node.go
  2. +4
    -0
      proxy/app_conn.go
  3. +107
    -3
      proxy/multi_app_conn.go
  4. +8
    -2
      proxy/state.go
  5. +60
    -6
      state/execution.go
  6. +21
    -1
      state/state.go

+ 6
- 4
node/node.go View File

@ -62,8 +62,7 @@ func NewNode(config cfg.Config, privValidator *types.PrivValidator, clientCreato
// Get State
state := getState(config, stateDB)
// Create the proxyApp, which houses three connections:
// query, consensus, and mempool
// Create the proxyApp, which manages connections (consensus, mempool, query)
proxyApp := proxy.NewAppConns(config, clientCreator, state, blockStore)
if _, err := proxyApp.Start(); err != nil {
Exit(Fmt("Error starting proxy app connections: %v", err))
@ -391,9 +390,12 @@ func newConsensusState(config cfg.Config) *consensus.ConsensusState {
stateDB := dbm.NewDB("state", config.GetString("db_backend"), config.GetString("db_dir"))
state := sm.MakeGenesisStateFromFile(stateDB, config.GetString("genesis_file"))
// Create two proxyAppConn connections,
// one for the consensus and one for the mempool.
// Create proxyAppConn connection (consensus, mempool, query)
proxyApp := proxy.NewAppConns(config, proxy.DefaultClientCreator(config), state, blockStore)
_, err := proxyApp.Start()
if err != nil {
Exit(Fmt("Error starting proxy app conns: %v", err))
}
// add the chainid to the global config
config.Set("chain_id", state.ChainID)


+ 4
- 0
proxy/app_conn.go View File

@ -56,15 +56,19 @@ func NewAppConnConsensus(appConn tmspcli.Client) *appConnConsensus {
func (app *appConnConsensus) SetResponseCallback(cb tmspcli.Callback) {
app.appConn.SetResponseCallback(cb)
}
func (app *appConnConsensus) Error() error {
return app.appConn.Error()
}
func (app *appConnConsensus) InitChainSync(validators []*types.Validator) (err error) {
return app.appConn.InitChainSync(validators)
}
func (app *appConnConsensus) BeginBlockSync(header *types.Header) (err error) {
return app.appConn.BeginBlockSync(header)
}
func (app *appConnConsensus) AppendTxAsync(tx []byte) *tmspcli.ReqRes {
return app.appConn.AppendTxAsync(tx)
}


+ 107
- 3
proxy/multi_app_conn.go View File

@ -1,10 +1,20 @@
package proxy
import (
"bytes"
"fmt"
"sync"
. "github.com/tendermint/go-common"
cfg "github.com/tendermint/go-config"
"github.com/tendermint/tendermint/types" // ...
tmspcli "github.com/tendermint/tmsp/client"
"github.com/tendermint/tmsp/example/dummy"
nilapp "github.com/tendermint/tmsp/example/nil"
)
//-----------------------------
// Tendermint's interface to the application consists of multiple connections
type AppConns interface {
Service
@ -19,7 +29,9 @@ func NewAppConns(config cfg.Config, clientCreator ClientCreator, state State, bl
}
// a multiAppConn is made of a few appConns (mempool, consensus, query)
// and manages their underlying tmsp clients, ensuring they reboot together
// and manages their underlying tmsp clients, including the handshake
// which ensures the app and tendermint are synced.
// TODO: on app restart, clients must reboot together
type multiAppConn struct {
BaseService
@ -57,6 +69,7 @@ func (app *multiAppConn) Consensus() AppConnConsensus {
return app.consensusConn
}
// Returns the query Connection
func (app *multiAppConn) Query() AppConnQuery {
return app.queryConn
}
@ -85,11 +98,102 @@ func (app *multiAppConn) OnStart() error {
}
app.consensusConn = NewAppConnConsensus(concli)
// TODO: handshake
// ensure app is synced to the latest state
return app.Handshake()
}
// TODO: retry the handshake once if it fails the first time
func (app *multiAppConn) Handshake() error {
// handshake is done on the query conn
res, tmspInfo, blockInfo, configInfo := app.queryConn.InfoSync()
if res.IsErr() {
return fmt.Errorf("Error calling Info. Code: %v; Data: %X; Log: %s", res.Code, res.Data, res.Log)
}
if blockInfo == nil {
log.Warn("blockInfo is nil, aborting handshake")
return nil
}
log.Notice("TMSP Handshake", "height", blockInfo.BlockHeight, "block_hash", blockInfo.BlockHash, "app_hash", blockInfo.AppHash)
// TODO: check overflow or change pb to int32
blockHeight := int(blockInfo.BlockHeight)
blockHash := blockInfo.BlockHash
appHash := blockInfo.AppHash
if tmspInfo != nil {
// TODO: check tmsp version (or do this in the tmspcli?)
_ = tmspInfo
}
// of the last block (nil if we starting from 0)
var header *types.Header
var partsHeader types.PartSetHeader
// check block
// if the blockHeight == 0, we will replay everything
if blockHeight != 0 {
blockMeta := app.blockStore.LoadBlockMeta(blockHeight)
if blockMeta == nil {
return fmt.Errorf("Handshake error. Could not find block #%d", blockHeight)
}
// check block hash
if !bytes.Equal(blockMeta.Hash, blockHash) {
return fmt.Errorf("Handshake error. Block hash at height %d does not match. Got %X, expected %X", blockHeight, blockHash, blockMeta.Hash)
}
// check app hash
if !bytes.Equal(blockMeta.Header.AppHash, appHash) {
return fmt.Errorf("Handshake error. App hash at height %d does not match. Got %X, expected %X", blockHeight, appHash, blockMeta.Header.AppHash)
}
header = blockMeta.Header
partsHeader = blockMeta.PartsHeader
}
if configInfo != nil {
// TODO: set config info
_ = configInfo
}
// TODO: replay blocks
// replay blocks up to the latest in the blockstore
err := app.state.ReplayBlocks(header, partsHeader, app.consensusConn, app.blockStore)
if err != nil {
return fmt.Errorf("Error on replay: %v", err)
}
// TODO: (on restart) replay mempool
return nil
}
//--------------------------------
// Get a connected tmsp client
func NewTMSPClient(addr, transport string) (tmspcli.Client, error) {
var client tmspcli.Client
// use local app (for testing)
// TODO: local proxy app conn
switch addr {
case "nilapp":
app := nilapp.NewNilApplication()
mtx := new(sync.Mutex) // TODO
client = tmspcli.NewLocalClient(mtx, app)
case "dummy":
app := dummy.NewDummyApplication()
mtx := new(sync.Mutex) // TODO
client = tmspcli.NewLocalClient(mtx, app)
default:
// Run forever in a loop
mustConnect := false
remoteApp, err := tmspcli.NewClient(addr, transport, mustConnect)
if err != nil {
return nil, fmt.Errorf("Failed to connect to proxy for mempool: %v", err)
}
client = remoteApp
}
return client, nil
}

+ 8
- 2
proxy/state.go View File

@ -1,9 +1,15 @@
package proxy
import (
"github.com/tendermint/tendermint/types"
)
type State interface {
// TODO
ReplayBlocks(*types.Header, types.PartSetHeader, AppConnConsensus, BlockStore) error
}
type BlockStore interface {
// TODO
Height() int
LoadBlockMeta(height int) *types.BlockMeta
LoadBlock(height int) *types.Block
}

+ 60
- 6
state/execution.go View File

@ -41,12 +41,9 @@ func (s *State) ExecBlock(eventCache types.Fireable, proxyAppConn proxy.AppConnC
}
// All good!
// Update validator accums and set state variables
nextValSet.IncrementAccum(1)
s.LastBlockHeight = block.Height
s.LastBlockID = types.BlockID{block.Hash(), blockPartsHeader}
s.LastBlockTime = block.Time
s.Validators = nextValSet
s.LastValidators = valSet
s.SetBlockAndValidators(block.Header, blockPartsHeader, valSet, nextValSet)
return nil
}
@ -89,7 +86,7 @@ func (s *State) execBlockOnProxyApp(eventCache types.Fireable, proxyAppConn prox
proxyAppConn.SetResponseCallback(proxyCb)
// Begin block
err := proxyAppConn.BeginBlockSync(uint64(block.Height))
err := proxyAppConn.BeginBlockSync(types.TM2PB.Header(block.Header))
if err != nil {
log.Warn("Error in proxyAppConn.BeginBlock", "error", err)
return err
@ -181,3 +178,60 @@ type InvalidTxError struct {
func (txErr InvalidTxError) Error() string {
return Fmt("Invalid tx: [%v] code: [%v]", txErr.Tx, txErr.Code)
}
//-----------------------------------------------------------------------------
// Replay all blocks after blockHeight and ensure the result matches the current state
func (s *State) ReplayBlocks(header *types.Header, partsHeader types.PartSetHeader,
appConnConsensus proxy.AppConnConsensus, blockStore proxy.BlockStore) error {
// fresh state to work on
stateCopy := s.Copy()
// reset to this height (do nothing if its 0)
var blockHeight int
if header != nil {
blockHeight = header.Height
// TODO: put validators in iavl tree so we can set the state with an older validator set
lastVals, nextVals := stateCopy.GetValidators()
stateCopy.SetBlockAndValidators(header, partsHeader, lastVals, nextVals)
}
// replay all blocks starting with blockHeight+1
for i := blockHeight + 1; i <= blockStore.Height(); i++ {
blockMeta := blockStore.LoadBlockMeta(i)
if blockMeta == nil {
PanicSanity(Fmt("Nil blockMeta at height %d when blockStore height is %d", i, blockStore.Height()))
}
block := blockStore.LoadBlock(i)
if block == nil {
PanicSanity(Fmt("Nil block at height %d when blockStore height is %d", i, blockStore.Height()))
}
// run the transactions
var eventCache events.Fireable // nil
err := stateCopy.ExecBlock(eventCache, appConnConsensus, block, blockMeta.PartsHeader)
if err != nil {
return fmt.Errorf("Error on ExecBlock: %v", err)
}
// commit the block (app should save the state)
res := appConnConsensus.CommitSync()
if res.IsErr() {
return fmt.Errorf("Error on Commit: %v", res)
}
if res.Log != "" {
log.Debug("Commit.Log: " + res.Log)
}
// update the state hash
stateCopy.AppHash = res.Data
}
// The computed state and the previously set state should be identical
if !s.Equals(stateCopy) {
return fmt.Errorf("State after replay does not match saved state. Got ----\n%v\nExpected ----\n%v\n", stateCopy, s)
}
return nil
}

+ 21
- 1
state/state.go View File

@ -66,13 +66,33 @@ func (s *State) Copy() *State {
func (s *State) Save() {
s.mtx.Lock()
defer s.mtx.Unlock()
s.db.Set(stateKey, s.Bytes())
}
func (s *State) Equals(s2 *State) bool {
return bytes.Equal(s.Bytes(), s2.Bytes())
}
func (s *State) Bytes() []byte {
buf, n, err := new(bytes.Buffer), new(int), new(error)
wire.WriteBinary(s, buf, n, err)
if *err != nil {
PanicCrisis(*err)
}
s.db.Set(stateKey, buf.Bytes())
return buf.Bytes()
}
// Mutate state variables to match block and validators
func (s *State) SetBlockAndValidators(header *types.Header, blockPartsHeader types.PartSetHeader, prevValSet, nextValSet *types.ValidatorSet) {
s.LastBlockHeight = header.Height
s.LastBlockID = types.BlockID{block.Hash(), blockPartsHeader}
s.LastBlockTime = header.Time
s.Validators = nextValSet
s.LastValidators = prevValSet
}
func (s *State) GetValidators() (*types.ValidatorSet, *types.ValidatorSet) {
return s.LastValidators, s.Validators
}
//-----------------------------------------------------------------------------


Loading…
Cancel
Save