From caeda30b72637561793c5654e8bb26e0a7b1a035 Mon Sep 17 00:00:00 2001 From: Ethan Buchman Date: Sat, 10 Sep 2016 17:14:55 -0400 Subject: [PATCH] proxy: wrap NewTMSPClient in ClientCreator --- node/node.go | 10 ++--- proxy/app_conn_test.go | 9 +++-- proxy/client.go | 87 +++++++++++++++++++++++++++++------------ proxy/multi_app_conn.go | 19 ++++----- 4 files changed, 81 insertions(+), 44 deletions(-) diff --git a/node/node.go b/node/node.go index 2c3ae565a..72e08b8b8 100644 --- a/node/node.go +++ b/node/node.go @@ -48,10 +48,10 @@ func NewNodeDefault(config cfg.Config) *Node { // Get PrivValidator privValidatorFile := config.GetString("priv_validator_file") privValidator := types.LoadOrGenPrivValidator(privValidatorFile) - return NewNode(config, privValidator, proxy.NewTMSPClientDefault) + return NewNode(config, privValidator, proxy.DefaultClientCreator(config)) } -func NewNode(config cfg.Config, privValidator *types.PrivValidator, newTMSPClient proxy.NewTMSPClient) *Node { +func NewNode(config cfg.Config, privValidator *types.PrivValidator, clientCreator proxy.ClientCreator) *Node { EnsureDir(config.GetString("db_dir"), 0700) // incase we use memdb, cswal still gets written here @@ -67,7 +67,7 @@ func NewNode(config cfg.Config, privValidator *types.PrivValidator, newTMSPClien // Create the proxyApp, which houses three connections: // query, consensus, and mempool - proxyApp := proxy.NewAppConns(config, newTMSPClient, state, blockStore) + proxyApp := proxy.NewAppConns(config, clientCreator, state, blockStore) // add the chainid and number of validators to the global config config.Set("chain_id", state.ChainID) @@ -313,7 +313,7 @@ func getState(config cfg.Config, stateDB dbm.DB) *sm.State { // * supply an in-proc tmsp app // should fork tendermint/tendermint and implement RunNode to // call NewNode with their custom priv validator and/or custom -// proxy.NewTMSPClient function. +// proxy.ClientCreator interface func RunNode(config cfg.Config) { // Wait until the genesis doc becomes available genDocFile := config.GetString("genesis_file") @@ -392,7 +392,7 @@ func newConsensusState(config cfg.Config) *consensus.ConsensusState { // Create two proxyAppConn connections, // one for the consensus and one for the mempool. - proxyApp := proxy.NewAppConns(config, proxy.NewTMSPClientDefault, state, blockStore) + proxyApp := proxy.NewAppConns(config, proxy.DefaultClientCreator(config), state, blockStore) // add the chainid to the global config config.Set("chain_id", state.ChainID) diff --git a/proxy/app_conn_test.go b/proxy/app_conn_test.go index 7e425d55e..3b7a3413e 100644 --- a/proxy/app_conn_test.go +++ b/proxy/app_conn_test.go @@ -45,6 +45,7 @@ var SOCKET = "socket" func TestEcho(t *testing.T) { sockPath := Fmt("unix:///tmp/echo_%v.sock", RandStr(6)) + clientCreator := NewRemoteClientCreator(sockPath, SOCKET, true) // Start server s, err := server.NewSocketServer(sockPath, dummy.NewDummyApplication()) @@ -53,7 +54,7 @@ func TestEcho(t *testing.T) { } defer s.Stop() // Start client - cli, err := NewTMSPClientDefault(sockPath, SOCKET) + cli, err := clientCreator.NewTMSPClient() if err != nil { Exit(err.Error()) } @@ -69,6 +70,7 @@ func TestEcho(t *testing.T) { func BenchmarkEcho(b *testing.B) { b.StopTimer() // Initialize sockPath := Fmt("unix:///tmp/echo_%v.sock", RandStr(6)) + clientCreator := NewRemoteClientCreator(sockPath, SOCKET, true) // Start server s, err := server.NewSocketServer(sockPath, dummy.NewDummyApplication()) if err != nil { @@ -76,7 +78,7 @@ func BenchmarkEcho(b *testing.B) { } defer s.Stop() // Start client - cli, err := NewTMSPClientDefault(sockPath, SOCKET) + cli, err := clientCreator.NewTMSPClient() if err != nil { Exit(err.Error()) } @@ -97,6 +99,7 @@ func BenchmarkEcho(b *testing.B) { func TestInfo(t *testing.T) { sockPath := Fmt("unix:///tmp/echo_%v.sock", RandStr(6)) + clientCreator := NewRemoteClientCreator(sockPath, SOCKET, true) // Start server s, err := server.NewSocketServer(sockPath, dummy.NewDummyApplication()) if err != nil { @@ -104,7 +107,7 @@ func TestInfo(t *testing.T) { } defer s.Stop() // Start client - cli, err := NewTMSPClientDefault(sockPath, SOCKET) + cli, err := clientCreator.NewTMSPClient() if err != nil { Exit(err.Error()) } diff --git a/proxy/client.go b/proxy/client.go index 17e742390..5e40f8569 100644 --- a/proxy/client.go +++ b/proxy/client.go @@ -4,40 +4,77 @@ import ( "fmt" "sync" + cfg "github.com/tendermint/go-config" tmspcli "github.com/tendermint/tmsp/client" "github.com/tendermint/tmsp/example/dummy" nilapp "github.com/tendermint/tmsp/example/nil" + "github.com/tendermint/tmsp/types" ) -// Function type to get a connected tmsp client -// Allows consumers to provide their own in-proc apps, -// or to implement alternate address schemes and transports -type NewTMSPClient func(addr, transport string) (tmspcli.Client, error) +// NewTMSPClient returns newly connected client +type ClientCreator interface { + NewTMSPClient() (tmspcli.Client, error) +} + +//---------------------------------------------------- +// local proxy uses a mutex on an in-proc app + +type localClientCreator struct { + mtx *sync.Mutex + app types.Application +} + +func NewLocalClientCreator(app types.Application) ClientCreator { + return &localClientCreator{ + mtx: new(sync.Mutex), + app: app, + } +} + +func (l *localClientCreator) NewTMSPClient() (tmspcli.Client, error) { + return tmspcli.NewLocalClient(l.mtx, l.app), nil +} + +//--------------------------------------------------------------- +// remote proxy opens new connections to an external app process + +type remoteClientCreator struct { + addr string + transport string + mustConnect bool +} + +func NewRemoteClientCreator(addr, transport string, mustConnect bool) ClientCreator { + return &remoteClientCreator{ + addr: addr, + transport: transport, + mustConnect: mustConnect, + } +} + +func (r *remoteClientCreator) NewTMSPClient() (tmspcli.Client, error) { + // Run forever in a loop + remoteApp, err := tmspcli.NewClient(r.addr, r.transport, r.mustConnect) + if err != nil { + return nil, fmt.Errorf("Failed to connect to proxy: %v", err) + } + return remoteApp, nil +} + +//----------------------------------------------------------------- +// default -// Get a connected tmsp client. -// Offers some default in-proc apps, else socket/grpc. -func NewTMSPClientDefault(addr, transport string) (tmspcli.Client, error) { - var client tmspcli.Client +func DefaultClientCreator(config cfg.Config) ClientCreator { + addr := config.GetString("proxy_app") + transport := config.GetString("tmsp") - // use local app (for testing) - // TODO: local proxy app conn switch addr { - case "nilapp": - app := nilapp.NewNilApplication() - mtx := new(sync.Mutex) // TODO - client = tmspcli.NewLocalClient(mtx, app) case "dummy": - app := dummy.NewDummyApplication() - mtx := new(sync.Mutex) // TODO - client = tmspcli.NewLocalClient(mtx, app) + return NewLocalClientCreator(dummy.NewDummyApplication()) + case "nil": + return NewLocalClientCreator(nilapp.NewNilApplication()) default: - // Run forever in a loop - mustConnect := false - remoteApp, err := tmspcli.NewClient(addr, transport, mustConnect) - if err != nil { - return nil, fmt.Errorf("Failed to connect to proxy for mempool: %v", err) - } - client = remoteApp + mustConnect := true + return NewRemoteClientCreator(addr, transport, mustConnect) } - return client, nil } diff --git a/proxy/multi_app_conn.go b/proxy/multi_app_conn.go index 5c16678f9..fe009e1d6 100644 --- a/proxy/multi_app_conn.go +++ b/proxy/multi_app_conn.go @@ -12,8 +12,8 @@ type AppConns interface { Query() AppConnQuery } -func NewAppConns(config cfg.Config, newTMSPClient NewTMSPClient, state State, blockStore BlockStore) AppConns { - return NewMultiAppConn(config, newTMSPClient, state, blockStore) +func NewAppConns(config cfg.Config, clientCreator ClientCreator, state State, blockStore BlockStore) AppConns { + return NewMultiAppConn(config, clientCreator, state, blockStore) } // a multiAppConn is made of a few appConns (mempool, consensus, query) @@ -30,16 +30,16 @@ type multiAppConn struct { consensusConn *appConnConsensus queryConn *appConnQuery - newTMSPClient NewTMSPClient + clientCreator ClientCreator } // Make all necessary tmsp connections to the application -func NewMultiAppConn(config cfg.Config, newTMSPClient NewTMSPClient, state State, blockStore BlockStore) *multiAppConn { +func NewMultiAppConn(config cfg.Config, clientCreator ClientCreator, state State, blockStore BlockStore) *multiAppConn { multiAppConn := &multiAppConn{ config: config, state: state, blockStore: blockStore, - newTMSPClient: newTMSPClient, + clientCreator: clientCreator, } multiAppConn.QuitService = *NewQuitService(log, "multiAppConn", multiAppConn) multiAppConn.Start() @@ -63,25 +63,22 @@ func (app *multiAppConn) Query() AppConnQuery { func (app *multiAppConn) OnStart() error { app.QuitService.OnStart() - addr := app.config.GetString("proxy_app") - transport := app.config.GetString("tmsp") - // query connection - querycli, err := app.newTMSPClient(addr, transport) + querycli, err := app.clientCreator.NewTMSPClient() if err != nil { return err } app.queryConn = NewAppConnQuery(querycli) // mempool connection - memcli, err := app.newTMSPClient(addr, transport) + memcli, err := app.clientCreator.NewTMSPClient() if err != nil { return err } app.mempoolConn = NewAppConnMempool(memcli) // consensus connection - concli, err := app.newTMSPClient(addr, transport) + concli, err := app.clientCreator.NewTMSPClient() if err != nil { return err }