diff --git a/blockchain/reactor.go b/blockchain/reactor.go index 53aa311cc..fff8a65b1 100644 --- a/blockchain/reactor.go +++ b/blockchain/reactor.go @@ -44,7 +44,7 @@ type BlockchainReactor struct { sw *p2p.Switch state *sm.State - proxyAppConn proxy.AppConn // same as consensus.proxyAppConn + proxyAppConn proxy.AppConnConsensus // same as consensus.proxyAppConn store *BlockStore pool *BlockPool fastSync bool @@ -55,7 +55,7 @@ type BlockchainReactor struct { evsw *events.EventSwitch } -func NewBlockchainReactor(state *sm.State, proxyAppConn proxy.AppConn, store *BlockStore, fastSync bool) *BlockchainReactor { +func NewBlockchainReactor(state *sm.State, proxyAppConn proxy.AppConnConsensus, store *BlockStore, fastSync bool) *BlockchainReactor { if state.LastBlockHeight == store.Height()-1 { store.height -= 1 // XXX HACK, make this better } diff --git a/consensus/state.go b/consensus/state.go index 564f02529..e7edb7308 100644 --- a/consensus/state.go +++ b/consensus/state.go @@ -215,7 +215,7 @@ type ConsensusState struct { QuitService config cfg.Config - proxyAppConn proxy.AppConn + proxyAppConn proxy.AppConnConsensus blockStore *bc.BlockStore mempool *mempl.Mempool privValidator *types.PrivValidator @@ -238,7 +238,7 @@ type ConsensusState struct { nSteps int // used for testing to limit the number of transitions the state makes } -func NewConsensusState(config cfg.Config, state *sm.State, proxyAppConn proxy.AppConn, blockStore *bc.BlockStore, mempool *mempl.Mempool) *ConsensusState { +func NewConsensusState(config cfg.Config, state *sm.State, proxyAppConn proxy.AppConnConsensus, blockStore *bc.BlockStore, mempool *mempl.Mempool) *ConsensusState { cs := &ConsensusState{ config: config, proxyAppConn: proxyAppConn, @@ -1283,7 +1283,7 @@ func (cs *ConsensusState) commitStateUpdateMempool(s *sm.State, block *types.Blo defer cs.mempool.Unlock() // flush out any CheckTx that have already started - cs.proxyAppConn.FlushSync() + // cs.proxyAppConn.FlushSync() // ?! XXX // Commit block, get hash back res := cs.proxyAppConn.CommitSync() diff --git a/mempool/mempool.go b/mempool/mempool.go index 6cd2227c2..34df870f8 100644 --- a/mempool/mempool.go +++ b/mempool/mempool.go @@ -49,7 +49,7 @@ type Mempool struct { config cfg.Config proxyMtx sync.Mutex - proxyAppConn proxy.AppConn + proxyAppConn proxy.AppConnMempool txs *clist.CList // concurrent linked-list of good txs counter int64 // simple incrementing counter height int // the last block Update()'d to @@ -63,7 +63,7 @@ type Mempool struct { cacheList *list.List // to remove oldest tx when cache gets too big } -func NewMempool(config cfg.Config, proxyAppConn proxy.AppConn) *Mempool { +func NewMempool(config cfg.Config, proxyAppConn proxy.AppConnMempool) *Mempool { mempool := &Mempool{ config: config, proxyAppConn: proxyAppConn, diff --git a/node/node.go b/node/node.go index 4842b6aee..99ed4c4bb 100644 --- a/node/node.go +++ b/node/node.go @@ -6,7 +6,6 @@ import ( "net" "net/http" "strings" - "sync" "time" . "github.com/tendermint/go-common" @@ -26,9 +25,6 @@ import ( sm "github.com/tendermint/tendermint/state" "github.com/tendermint/tendermint/types" "github.com/tendermint/tendermint/version" - tmspcli "github.com/tendermint/tmsp/client" - "github.com/tendermint/tmsp/example/dummy" - "github.com/tendermint/tmsp/example/nil" ) import _ "net/http/pprof" @@ -47,7 +43,7 @@ type Node struct { privKey crypto.PrivKeyEd25519 } -func NewNode(config cfg.Config, privValidator *types.PrivValidator, getProxyApp func(proxyAddr, transport string, appHash []byte) proxy.AppConn) *Node { +func NewNode(config cfg.Config, privValidator *types.PrivValidator) *Node { EnsureDir(config.GetString("db_dir"), 0700) // incase we use memdb, cswal still gets written here @@ -61,12 +57,9 @@ func NewNode(config cfg.Config, privValidator *types.PrivValidator, getProxyApp // Get State state := getState(config, stateDB) - // Create two proxyAppConn connections, + // Create the proxyApp, which houses two connections, // one for the consensus and one for the mempool. - proxyAddr := config.GetString("proxy_app") - transport := config.GetString("tmsp") - proxyAppConnMempool := getProxyApp(proxyAddr, transport, state.AppHash) - proxyAppConnConsensus := getProxyApp(proxyAddr, transport, state.AppHash) + proxyApp := proxy.NewMultiAppConn(config, state, blockStore) // add the chainid and number of validators to the global config config.Set("chain_id", state.ChainID) @@ -93,14 +86,14 @@ func NewNode(config cfg.Config, privValidator *types.PrivValidator, getProxyApp } // Make BlockchainReactor - bcReactor := bc.NewBlockchainReactor(state.Copy(), proxyAppConnConsensus, blockStore, fastSync) + bcReactor := bc.NewBlockchainReactor(state.Copy(), proxyApp.Consensus(), blockStore, fastSync) // Make MempoolReactor - mempool := mempl.NewMempool(config, proxyAppConnMempool) + mempool := mempl.NewMempool(config, proxyApp.Mempool()) mempoolReactor := mempl.NewMempoolReactor(config, mempool) // Make ConsensusReactor - consensusState := consensus.NewConsensusState(config, state.Copy(), proxyAppConnConsensus, blockStore, mempool) + consensusState := consensus.NewConsensusState(config, state.Copy(), proxyApp.Consensus(), blockStore, mempool) consensusReactor := consensus.NewConsensusReactor(consensusState, blockStore, fastSync) if privValidator != nil { consensusReactor.SetPrivValidator(privValidator) @@ -125,6 +118,7 @@ func NewNode(config cfg.Config, privValidator *types.PrivValidator, getProxyApp // run the profile server profileHost := config.GetString("prof_laddr") if profileHost != "" { + go func() { log.Warn("Profile server", "error", http.ListenAndServe(profileHost, nil)) }() @@ -270,40 +264,6 @@ func makeNodeInfo(config cfg.Config, sw *p2p.Switch, privKey crypto.PrivKeyEd255 return nodeInfo } -// Get a connection to the proxyAppConn addr. -// Check the current hash, and panic if it doesn't match. -func GetProxyApp(addr, transport string, hash []byte) (proxyAppConn proxy.AppConn) { - // use local app (for testing) - switch addr { - case "nilapp": - app := nilapp.NewNilApplication() - mtx := new(sync.Mutex) - proxyAppConn = tmspcli.NewLocalClient(mtx, app) - case "dummy": - app := dummy.NewDummyApplication() - mtx := new(sync.Mutex) - proxyAppConn = tmspcli.NewLocalClient(mtx, app) - default: - // Run forever in a loop - remoteApp, err := proxy.NewRemoteAppConn(addr, transport) - if err != nil { - Exit(Fmt("Failed to connect to proxy for mempool: %v", err)) - } - proxyAppConn = remoteApp - } - - // Check the hash - res := proxyAppConn.CommitSync() - if res.IsErr() { - PanicCrisis(Fmt("Error in getting proxyAppConn hash: %v", res)) - } - if !bytes.Equal(hash, res.Data) { - log.Warn(Fmt("ProxyApp hash does not match. Expected %X, got %X", hash, res.Data)) - } - - return proxyAppConn -} - // Load the most recent state from "state" db, // or create a new one (and save) from genesis. func getState(config cfg.Config, stateDB dbm.DB) *sm.State { @@ -319,7 +279,7 @@ func getState(config cfg.Config, stateDB dbm.DB) *sm.State { // Users wishing to use an external signer for their validators // should fork tendermint/tendermint and implement RunNode to -// load their custom priv validator and call NewNode(privVal, getProxyFunc) +// load their custom priv validator and call NewNode func RunNode(config cfg.Config) { // Wait until the genesis doc becomes available genDocFile := config.GetString("genesis_file") @@ -347,7 +307,7 @@ func RunNode(config cfg.Config) { privValidator := types.LoadOrGenPrivValidator(privValidatorFile) // Create & start node - n := NewNode(config, privValidator, GetProxyApp) + n := NewNode(config, privValidator) protocol, address := ProtocolAndAddress(config.GetString("node_laddr")) l := p2p.NewDefaultListener(protocol, address, config.GetBool("skip_upnp")) @@ -402,10 +362,7 @@ func newConsensusState(config cfg.Config) *consensus.ConsensusState { // Create two proxyAppConn connections, // one for the consensus and one for the mempool. - proxyAddr := config.GetString("proxy_app") - transport := config.GetString("tmsp") - proxyAppConnMempool := GetProxyApp(proxyAddr, transport, state.AppHash) - proxyAppConnConsensus := GetProxyApp(proxyAddr, transport, state.AppHash) + proxyApp := proxy.NewMultiAppConn(config, state, blockStore) // add the chainid to the global config config.Set("chain_id", state.ChainID) @@ -417,9 +374,9 @@ func newConsensusState(config cfg.Config) *consensus.ConsensusState { Exit(Fmt("Failed to start event switch: %v", err)) } - mempool := mempl.NewMempool(config, proxyAppConnMempool) + mempool := mempl.NewMempool(config, proxyApp.Mempool()) - consensusState := consensus.NewConsensusState(config, state.Copy(), proxyAppConnConsensus, blockStore, mempool) + consensusState := consensus.NewConsensusState(config, state.Copy(), proxyApp.Consensus(), blockStore, mempool) consensusState.SetEventSwitch(eventSwitch) return consensusState } diff --git a/node/node_test.go b/node/node_test.go index 5701c5b94..e3dd1d5ad 100644 --- a/node/node_test.go +++ b/node/node_test.go @@ -17,7 +17,7 @@ func TestNodeStartStop(t *testing.T) { privValidator := types.LoadOrGenPrivValidator(privValidatorFile) // Create & start node - n := NewNode(config, privValidator, GetProxyApp) + n := NewNode(config, privValidator) protocol, address := ProtocolAndAddress(config.GetString("node_laddr")) l := p2p.NewDefaultListener(protocol, address, config.GetBool("skip_upnp")) n.AddListener(l) diff --git a/proxy/app_conn.go b/proxy/app_conn.go index f959d7b8c..f804c97cb 100644 --- a/proxy/app_conn.go +++ b/proxy/app_conn.go @@ -2,8 +2,135 @@ package proxy import ( tmspcli "github.com/tendermint/tmsp/client" + "github.com/tendermint/tmsp/types" ) -type AppConn interface { - tmspcli.Client +//---------------------------------------------------------------------------------------- +// Enforce which tmsp msgs can be sent on a connection at the type level + +type AppConnConsensus interface { + SetResponseCallback(tmspcli.Callback) + Error() error + + InitChainSync(validators []*types.Validator) (err error) + + BeginBlockSync(height uint64) (err error) + AppendTxAsync(tx []byte) *tmspcli.ReqRes + EndBlockSync(height uint64) (changedValidators []*types.Validator, err error) + CommitSync() (res types.Result) +} + +type AppConnMempool interface { + SetResponseCallback(tmspcli.Callback) + Error() error + + CheckTxAsync(tx []byte) *tmspcli.ReqRes + + FlushAsync() *tmspcli.ReqRes + FlushSync() error +} + +type AppConnQuery interface { + Error() error + + EchoSync(string) (res types.Result) + InfoSync() (res types.Result) + QuerySync(tx []byte) (res types.Result) + + // SetOptionSync(key string, value string) (res types.Result) +} + +//----------------------------------------------------------------------------------------- +// Implements AppConnConsensus (subset of tmspcli.Client) + +type appConnConsensus struct { + appConn tmspcli.Client +} + +func NewAppConnConsensus(appConn tmspcli.Client) *appConnConsensus { + return &appConnConsensus{ + appConn: appConn, + } +} + +func (app *appConnConsensus) SetResponseCallback(cb tmspcli.Callback) { + app.appConn.SetResponseCallback(cb) +} +func (app *appConnConsensus) Error() error { + return app.appConn.Error() +} +func (app *appConnConsensus) InitChainSync(validators []*types.Validator) (err error) { + return app.appConn.InitChainSync(validators) +} +func (app *appConnConsensus) BeginBlockSync(height uint64) (err error) { + return app.appConn.BeginBlockSync(height) +} +func (app *appConnConsensus) AppendTxAsync(tx []byte) *tmspcli.ReqRes { + return app.appConn.AppendTxAsync(tx) +} + +func (app *appConnConsensus) EndBlockSync(height uint64) (changedValidators []*types.Validator, err error) { + return app.appConn.EndBlockSync(height) +} + +func (app *appConnConsensus) CommitSync() (res types.Result) { + return app.appConn.CommitSync() +} + +//------------------------------------------------ +// Implements AppConnMempool (subset of tmspcli.Client) + +type appConnMempool struct { + appConn tmspcli.Client +} + +func NewAppConnMempool(appConn tmspcli.Client) *appConnMempool { + return &appConnMempool{ + appConn: appConn, + } +} + +func (app *appConnMempool) SetResponseCallback(cb tmspcli.Callback) { + app.appConn.SetResponseCallback(cb) +} + +func (app *appConnMempool) Error() error { + return app.appConn.Error() +} + +func (app *appConnMempool) FlushAsync() *tmspcli.ReqRes { + return app.appConn.FlushAsync() +} + +func (app *appConnMempool) FlushSync() error { + return app.appConn.FlushSync() +} + +func (app *appConnMempool) CheckTxAsync(tx []byte) *tmspcli.ReqRes { + return app.appConn.CheckTxAsync(tx) +} + +//------------------------------------------------ +// Implements AppConnQuery (subset of tmspcli.Client) + +type appConnQuery struct { + appConn tmspcli.Client +} + +func NewAppConnQuery(appConn tmspcli.Client) *appConnQuery { + return &appConnQuery{ + appConn: appConn, + } +} + +func (app *appConnQuery) Error() error { + return app.appConn.Error() +} + +func (app *appConnQuery) InfoSync() (res types.Result) { + return app.appConn.InfoSync() +} + +func (app *appConnQuery) QuerySync(tx []byte) (res types.Result) { + return app.appConn.QuerySync(tx) } diff --git a/proxy/remote_app_conn_test.go b/proxy/app_conn_test.go similarity index 62% rename from proxy/remote_app_conn_test.go rename to proxy/app_conn_test.go index 58ad50f19..00a80cb11 100644 --- a/proxy/remote_app_conn_test.go +++ b/proxy/app_conn_test.go @@ -5,10 +5,42 @@ import ( "testing" . "github.com/tendermint/go-common" + tmspcli "github.com/tendermint/tmsp/client" "github.com/tendermint/tmsp/example/dummy" "github.com/tendermint/tmsp/server" + "github.com/tendermint/tmsp/types" ) +//---------------------------------------- + +type AppConnTest interface { + EchoAsync(string) *tmspcli.ReqRes + FlushSync() error + InfoSync() (res types.Result) +} + +type appConnTest struct { + appConn tmspcli.Client +} + +func NewAppConnTest(appConn tmspcli.Client) AppConnTest { + return &appConnTest{appConn} +} + +func (app *appConnTest) EchoAsync(msg string) *tmspcli.ReqRes { + return app.appConn.EchoAsync(msg) +} + +func (app *appConnTest) FlushSync() error { + return app.appConn.FlushSync() +} + +func (app *appConnTest) InfoSync() types.Result { + return app.appConn.InfoSync() +} + +//---------------------------------------- + var SOCKET = "socket" func TestEcho(t *testing.T) { @@ -21,12 +53,12 @@ func TestEcho(t *testing.T) { } defer s.Stop() // Start client - proxy, err := NewRemoteAppConn(sockPath, SOCKET) + cli, err := NewTMSPClient(sockPath, SOCKET) if err != nil { Exit(err.Error()) - } else { - t.Log("Connected") } + proxy := NewAppConnTest(cli) + t.Log("Connected") for i := 0; i < 1000; i++ { proxy.EchoAsync(Fmt("echo-%v", i)) @@ -44,12 +76,12 @@ func BenchmarkEcho(b *testing.B) { } defer s.Stop() // Start client - proxy, err := NewRemoteAppConn(sockPath, SOCKET) + cli, err := NewTMSPClient(sockPath, SOCKET) if err != nil { Exit(err.Error()) - } else { - b.Log("Connected") } + proxy := NewAppConnTest(cli) + b.Log("Connected") echoString := strings.Repeat(" ", 200) b.StartTimer() // Start benchmarking tests @@ -72,12 +104,13 @@ func TestInfo(t *testing.T) { } defer s.Stop() // Start client - proxy, err := NewRemoteAppConn(sockPath, SOCKET) + cli, err := NewTMSPClient(sockPath, SOCKET) if err != nil { Exit(err.Error()) - } else { - t.Log("Connected") } + proxy := NewAppConnTest(cli) + t.Log("Connected") + res := proxy.InfoSync() if res.IsErr() { t.Errorf("Unexpected error: %v", err) diff --git a/proxy/multi_app_conn.go b/proxy/multi_app_conn.go new file mode 100644 index 000000000..adc2747bb --- /dev/null +++ b/proxy/multi_app_conn.go @@ -0,0 +1,116 @@ +package proxy + +import ( + "fmt" + "sync" + + . "github.com/tendermint/go-common" + cfg "github.com/tendermint/go-config" + tmspcli "github.com/tendermint/tmsp/client" + "github.com/tendermint/tmsp/example/dummy" + nilapp "github.com/tendermint/tmsp/example/nil" +) + +// Get a connected tmsp client and perform handshake +func NewTMSPClient(addr, transport string) (tmspcli.Client, error) { + var client tmspcli.Client + + // use local app (for testing) + // TODO: local proxy app conn + switch addr { + case "nilapp": + app := nilapp.NewNilApplication() + mtx := new(sync.Mutex) // TODO + client = tmspcli.NewLocalClient(mtx, app) + case "dummy": + app := dummy.NewDummyApplication() + mtx := new(sync.Mutex) // TODO + client = tmspcli.NewLocalClient(mtx, app) + default: + // Run forever in a loop + mustConnect := false + remoteApp, err := tmspcli.NewClient(addr, transport, mustConnect) + if err != nil { + return nil, fmt.Errorf("Failed to connect to proxy for mempool: %v", err) + } + client = remoteApp + } + return client, nil +} + +// TODO +func Handshake(config cfg.Config, state State, blockStore BlockStore) { + // XXX: Handshake + /*res := client.CommitSync() + if res.IsErr() { + PanicCrisis(Fmt("Error in getting multiAppConnConn hash: %v", res)) + } + if !bytes.Equal(hash, res.Data) { + log.Warn(Fmt("ProxyApp hash does not match. Expected %X, got %X", hash, res.Data)) + }*/ +} + +//--------- + +// a multiAppConn is made of a few appConns (mempool, consensus) +// and manages their underlying tmsp clients, ensuring they reboot together +type multiAppConn struct { + QuitService + + config cfg.Config + + state State + blockStore BlockStore + + mempoolConn *appConnMempool + consensusConn *appConnConsensus +} + +// Make all necessary tmsp connections to the application +func NewMultiAppConn(config cfg.Config, state State, blockStore BlockStore) *multiAppConn { + multiAppConn := &multiAppConn{ + config: config, + state: state, + blockStore: blockStore, + } + multiAppConn.QuitService = *NewQuitService(log, "multiAppConn", multiAppConn) + multiAppConn.Start() + return multiAppConn +} + +// Returns the mempool connection +func (app *multiAppConn) Mempool() AppConnMempool { + return app.mempoolConn +} + +// Returns the consensus Connection +func (app *multiAppConn) Consensus() AppConnConsensus { + return app.consensusConn +} + +func (app *multiAppConn) OnStart() error { + app.QuitService.OnStart() + + addr := app.config.GetString("proxy_app") + transport := app.config.GetString("tmsp") + + memcli, err := NewTMSPClient(addr, transport) + if err != nil { + return err + } + app.mempoolConn = NewAppConnMempool(memcli) + + concli, err := NewTMSPClient(addr, transport) + if err != nil { + return err + } + app.consensusConn = NewAppConnConsensus(concli) + + // TODO: handshake + + // TODO: replay blocks + + // TODO: (on restart) replay mempool + + return nil +} diff --git a/proxy/remote_app_conn.go b/proxy/remote_app_conn.go deleted file mode 100644 index 9076bb450..000000000 --- a/proxy/remote_app_conn.go +++ /dev/null @@ -1,23 +0,0 @@ -package proxy - -import ( - tmspcli "github.com/tendermint/tmsp/client" -) - -// This is goroutine-safe, but users should beware that -// the application in general is not meant to be interfaced -// with concurrent callers. -type remoteAppConn struct { - tmspcli.Client -} - -func NewRemoteAppConn(addr, transport string) (*remoteAppConn, error) { - client, err := tmspcli.NewClient(addr, transport, false) - if err != nil { - return nil, err - } - appConn := &remoteAppConn{ - Client: client, - } - return appConn, nil -} diff --git a/proxy/state.go b/proxy/state.go new file mode 100644 index 000000000..a52dd0f8a --- /dev/null +++ b/proxy/state.go @@ -0,0 +1,9 @@ +package proxy + +type State interface { + // TODO +} + +type BlockStore interface { + // TODO +} diff --git a/rpc/test/helpers.go b/rpc/test/helpers.go index 75c8fdd34..d9a5aa915 100644 --- a/rpc/test/helpers.go +++ b/rpc/test/helpers.go @@ -54,7 +54,7 @@ func newNode(ready chan struct{}) { // Create & start node privValidatorFile := config.GetString("priv_validator_file") privValidator := types.LoadOrGenPrivValidator(privValidatorFile) - node = nm.NewNode(config, privValidator, nm.GetProxyApp) + node = nm.NewNode(config, privValidator) protocol, address := nm.ProtocolAndAddress(config.GetString("node_laddr")) l := p2p.NewDefaultListener(protocol, address, true) node.AddListener(l) diff --git a/state/execution.go b/state/execution.go index 13e776c8c..14ea965c1 100644 --- a/state/execution.go +++ b/state/execution.go @@ -18,7 +18,7 @@ func (s *State) ValidateBlock(block *types.Block) error { // Execute the block to mutate State. // Validates block and then executes Data.Txs in the block. -func (s *State) ExecBlock(eventCache events.Fireable, proxyAppConn proxy.AppConn, block *types.Block, blockPartsHeader types.PartSetHeader) error { +func (s *State) ExecBlock(eventCache events.Fireable, proxyAppConn proxy.AppConnConsensus, block *types.Block, blockPartsHeader types.PartSetHeader) error { // Validate the block. err := s.validateBlock(block) @@ -55,7 +55,7 @@ func (s *State) ExecBlock(eventCache events.Fireable, proxyAppConn proxy.AppConn // Executes block's transactions on proxyAppConn. // TODO: Generate a bitmap or otherwise store tx validity in state. -func (s *State) execBlockOnProxyApp(eventCache events.Fireable, proxyAppConn proxy.AppConn, block *types.Block) error { +func (s *State) execBlockOnProxyApp(eventCache events.Fireable, proxyAppConn proxy.AppConnConsensus, block *types.Block) error { var validTxs, invalidTxs = 0, 0