From 206d00ed8ce228112cc3d40fb7f3f06ae41a7cd4 Mon Sep 17 00:00:00 2001 From: Ethan Buchman Date: Fri, 9 Sep 2016 23:10:23 -0400 Subject: [PATCH 1/6] fixes from review --- consensus/state.go | 21 +++++++++------------ 1 file changed, 9 insertions(+), 12 deletions(-) diff --git a/consensus/state.go b/consensus/state.go index 714d17c77..51fd8864e 100644 --- a/consensus/state.go +++ b/consensus/state.go @@ -318,13 +318,13 @@ func (cs *ConsensusState) OnStart() error { // let's go for it anyways, maybe we're fine } - // schedule the first round! - cs.scheduleRound0(cs.Height) - - // start the receiveRoutine last - // to avoid races (catchupReplay may have queued tocks/messages) + // now start the receiveRoutine go cs.receiveRoutine(0) + // schedule the first round! + // use GetRoundState so we don't race the receiveRoutine for access + cs.scheduleRound0(cs.GetRoundState()) + return nil } @@ -421,13 +421,13 @@ func (cs *ConsensusState) updateRoundStep(round int, step RoundStepType) { } // enterNewRound(height, 0) at cs.StartTime. -func (cs *ConsensusState) scheduleRound0(height int) { +func (cs *ConsensusState) scheduleRound0(rs *RoundState) { //log.Info("scheduleRound0", "now", time.Now(), "startTime", cs.StartTime) - sleepDuration := cs.StartTime.Sub(time.Now()) + sleepDuration := rs.StartTime.Sub(time.Now()) if sleepDuration < time.Duration(0) { sleepDuration = time.Duration(0) } - cs.scheduleTimeout(sleepDuration, height, 0, RoundStepNewHeight) + cs.scheduleTimeout(sleepDuration, rs.Height, 0, RoundStepNewHeight) } // Attempt to schedule a timeout by sending timeoutInfo on the tickChan. @@ -1270,7 +1270,7 @@ func (cs *ConsensusState) finalizeCommit(height int) { // cs.StartTime is already set. // Schedule Round0 to start soon. - cs.scheduleRound0(height + 1) + cs.scheduleRound0(&cs.RoundState) // By here, // * cs.Height has been increment to height+1 @@ -1286,9 +1286,6 @@ func (cs *ConsensusState) commitStateUpdateMempool(s *sm.State, block *types.Blo cs.mempool.Lock() defer cs.mempool.Unlock() - // flush out any CheckTx that have already started - // cs.proxyAppConn.FlushSync() // ?! XXX - // Commit block, get hash back res := cs.proxyAppConn.CommitSync() if res.IsErr() { From 035ca7ef6124ec7d562642fd2aaf7f07dc70b424 Mon Sep 17 00:00:00 2001 From: Ethan Buchman Date: Fri, 9 Sep 2016 23:55:24 -0400 Subject: [PATCH 2/6] proxy: NewAppConns takes a NewTMSPClient func --- node/node.go | 26 +++++++++++------- node/node_test.go | 7 +---- proxy/app_conn_test.go | 6 ++--- proxy/client.go | 43 ++++++++++++++++++++++++++++++ proxy/multi_app_conn.go | 59 ++++++++++------------------------------- rpc/test/helpers.go | 5 +--- 6 files changed, 78 insertions(+), 68 deletions(-) create mode 100644 proxy/client.go diff --git a/node/node.go b/node/node.go index f66dc836a..969d4b612 100644 --- a/node/node.go +++ b/node/node.go @@ -44,7 +44,14 @@ type Node struct { proxyApp proxy.AppConns } -func NewNode(config cfg.Config, privValidator *types.PrivValidator) *Node { +func NewNodeDefault(config cfg.Config) *Node { + // Get PrivValidator + privValidatorFile := config.GetString("priv_validator_file") + privValidator := types.LoadOrGenPrivValidator(privValidatorFile) + return NewNode(config, privValidator, proxy.NewTMSPClientDefault) +} + +func NewNode(config cfg.Config, privValidator *types.PrivValidator, newTMSPClient proxy.NewTMSPClient) *Node { EnsureDir(config.GetString("db_dir"), 0700) // incase we use memdb, cswal still gets written here @@ -60,7 +67,7 @@ func NewNode(config cfg.Config, privValidator *types.PrivValidator) *Node { // Create the proxyApp, which houses three connections: // query, consensus, and mempool - proxyApp := proxy.NewAppConns(config, state, blockStore) + proxyApp := proxy.NewAppConns(config, newTMSPClient, state, blockStore) // add the chainid and number of validators to the global config config.Set("chain_id", state.ChainID) @@ -299,9 +306,12 @@ func getState(config cfg.Config, stateDB dbm.DB) *sm.State { //------------------------------------------------------------------------------ -// Users wishing to use an external signer for their validators +// Users wishing to: +// * use an external signer for their validators +// * supply an in-proc tmsp app // should fork tendermint/tendermint and implement RunNode to -// load their custom priv validator and call NewNode +// call NewNode with their custom priv validator and/or custom +// proxy.NewTMSPClient function. func RunNode(config cfg.Config) { // Wait until the genesis doc becomes available genDocFile := config.GetString("genesis_file") @@ -324,12 +334,8 @@ func RunNode(config cfg.Config) { } } - // Get PrivValidator - privValidatorFile := config.GetString("priv_validator_file") - privValidator := types.LoadOrGenPrivValidator(privValidatorFile) - // Create & start node - n := NewNode(config, privValidator) + n := NewNodeDefault(config) protocol, address := ProtocolAndAddress(config.GetString("node_laddr")) l := p2p.NewDefaultListener(protocol, address, config.GetBool("skip_upnp")) @@ -384,7 +390,7 @@ func newConsensusState(config cfg.Config) *consensus.ConsensusState { // Create two proxyAppConn connections, // one for the consensus and one for the mempool. - proxyApp := proxy.NewAppConns(config, state, blockStore) + proxyApp := proxy.NewAppConns(config, proxy.NewTMSPClientDefault, state, blockStore) // add the chainid to the global config config.Set("chain_id", state.ChainID) diff --git a/node/node_test.go b/node/node_test.go index e3dd1d5ad..880a2dab7 100644 --- a/node/node_test.go +++ b/node/node_test.go @@ -6,18 +6,13 @@ import ( "github.com/tendermint/go-p2p" "github.com/tendermint/tendermint/config/tendermint_test" - "github.com/tendermint/tendermint/types" ) func TestNodeStartStop(t *testing.T) { config := tendermint_test.ResetConfig("node_node_test") - // Get PrivValidator - privValidatorFile := config.GetString("priv_validator_file") - privValidator := types.LoadOrGenPrivValidator(privValidatorFile) - // Create & start node - n := NewNode(config, privValidator) + n := NewNodeDefault(config) protocol, address := ProtocolAndAddress(config.GetString("node_laddr")) l := p2p.NewDefaultListener(protocol, address, config.GetBool("skip_upnp")) n.AddListener(l) diff --git a/proxy/app_conn_test.go b/proxy/app_conn_test.go index 00a80cb11..7e425d55e 100644 --- a/proxy/app_conn_test.go +++ b/proxy/app_conn_test.go @@ -53,7 +53,7 @@ func TestEcho(t *testing.T) { } defer s.Stop() // Start client - cli, err := NewTMSPClient(sockPath, SOCKET) + cli, err := NewTMSPClientDefault(sockPath, SOCKET) if err != nil { Exit(err.Error()) } @@ -76,7 +76,7 @@ func BenchmarkEcho(b *testing.B) { } defer s.Stop() // Start client - cli, err := NewTMSPClient(sockPath, SOCKET) + cli, err := NewTMSPClientDefault(sockPath, SOCKET) if err != nil { Exit(err.Error()) } @@ -104,7 +104,7 @@ func TestInfo(t *testing.T) { } defer s.Stop() // Start client - cli, err := NewTMSPClient(sockPath, SOCKET) + cli, err := NewTMSPClientDefault(sockPath, SOCKET) if err != nil { Exit(err.Error()) } diff --git a/proxy/client.go b/proxy/client.go new file mode 100644 index 000000000..17e742390 --- /dev/null +++ b/proxy/client.go @@ -0,0 +1,43 @@ +package proxy + +import ( + "fmt" + "sync" + + tmspcli "github.com/tendermint/tmsp/client" + "github.com/tendermint/tmsp/example/dummy" + nilapp "github.com/tendermint/tmsp/example/nil" +) + +// Function type to get a connected tmsp client +// Allows consumers to provide their own in-proc apps, +// or to implement alternate address schemes and transports +type NewTMSPClient func(addr, transport string) (tmspcli.Client, error) + +// Get a connected tmsp client. +// Offers some default in-proc apps, else socket/grpc. +func NewTMSPClientDefault(addr, transport string) (tmspcli.Client, error) { + var client tmspcli.Client + + // use local app (for testing) + // TODO: local proxy app conn + switch addr { + case "nilapp": + app := nilapp.NewNilApplication() + mtx := new(sync.Mutex) // TODO + client = tmspcli.NewLocalClient(mtx, app) + case "dummy": + app := dummy.NewDummyApplication() + mtx := new(sync.Mutex) // TODO + client = tmspcli.NewLocalClient(mtx, app) + default: + // Run forever in a loop + mustConnect := false + remoteApp, err := tmspcli.NewClient(addr, transport, mustConnect) + if err != nil { + return nil, fmt.Errorf("Failed to connect to proxy for mempool: %v", err) + } + client = remoteApp + } + return client, nil +} diff --git a/proxy/multi_app_conn.go b/proxy/multi_app_conn.go index b51bf2be2..5c16678f9 100644 --- a/proxy/multi_app_conn.go +++ b/proxy/multi_app_conn.go @@ -1,56 +1,22 @@ package proxy import ( - "fmt" - "sync" - . "github.com/tendermint/go-common" cfg "github.com/tendermint/go-config" - tmspcli "github.com/tendermint/tmsp/client" - "github.com/tendermint/tmsp/example/dummy" - nilapp "github.com/tendermint/tmsp/example/nil" ) -// Get a connected tmsp client -func NewTMSPClient(addr, transport string) (tmspcli.Client, error) { - var client tmspcli.Client - - // use local app (for testing) - // TODO: local proxy app conn - switch addr { - case "nilapp": - app := nilapp.NewNilApplication() - mtx := new(sync.Mutex) // TODO - client = tmspcli.NewLocalClient(mtx, app) - case "dummy": - app := dummy.NewDummyApplication() - mtx := new(sync.Mutex) // TODO - client = tmspcli.NewLocalClient(mtx, app) - default: - // Run forever in a loop - mustConnect := false - remoteApp, err := tmspcli.NewClient(addr, transport, mustConnect) - if err != nil { - return nil, fmt.Errorf("Failed to connect to proxy for mempool: %v", err) - } - client = remoteApp - } - return client, nil -} - -//--------- - +// Tendermint's interface to the application consists of multiple connections type AppConns interface { Mempool() AppConnMempool Consensus() AppConnConsensus Query() AppConnQuery } -func NewAppConns(config cfg.Config, state State, blockStore BlockStore) AppConns { - return NewMultiAppConn(config, state, blockStore) +func NewAppConns(config cfg.Config, newTMSPClient NewTMSPClient, state State, blockStore BlockStore) AppConns { + return NewMultiAppConn(config, newTMSPClient, state, blockStore) } -// a multiAppConn is made of a few appConns (mempool, consensus) +// a multiAppConn is made of a few appConns (mempool, consensus, query) // and manages their underlying tmsp clients, ensuring they reboot together type multiAppConn struct { QuitService @@ -63,14 +29,17 @@ type multiAppConn struct { mempoolConn *appConnMempool consensusConn *appConnConsensus queryConn *appConnQuery + + newTMSPClient NewTMSPClient } // Make all necessary tmsp connections to the application -func NewMultiAppConn(config cfg.Config, state State, blockStore BlockStore) *multiAppConn { +func NewMultiAppConn(config cfg.Config, newTMSPClient NewTMSPClient, state State, blockStore BlockStore) *multiAppConn { multiAppConn := &multiAppConn{ - config: config, - state: state, - blockStore: blockStore, + config: config, + state: state, + blockStore: blockStore, + newTMSPClient: newTMSPClient, } multiAppConn.QuitService = *NewQuitService(log, "multiAppConn", multiAppConn) multiAppConn.Start() @@ -98,21 +67,21 @@ func (app *multiAppConn) OnStart() error { transport := app.config.GetString("tmsp") // query connection - querycli, err := NewTMSPClient(addr, transport) + querycli, err := app.newTMSPClient(addr, transport) if err != nil { return err } app.queryConn = NewAppConnQuery(querycli) // mempool connection - memcli, err := NewTMSPClient(addr, transport) + memcli, err := app.newTMSPClient(addr, transport) if err != nil { return err } app.mempoolConn = NewAppConnMempool(memcli) // consensus connection - concli, err := NewTMSPClient(addr, transport) + concli, err := app.newTMSPClient(addr, transport) if err != nil { return err } diff --git a/rpc/test/helpers.go b/rpc/test/helpers.go index d9a5aa915..17acaf9be 100644 --- a/rpc/test/helpers.go +++ b/rpc/test/helpers.go @@ -13,7 +13,6 @@ import ( "github.com/tendermint/tendermint/config/tendermint_test" nm "github.com/tendermint/tendermint/node" ctypes "github.com/tendermint/tendermint/rpc/core/types" - "github.com/tendermint/tendermint/types" ) // global variables for use across all tests @@ -52,9 +51,7 @@ func init() { // create a new node and sleep forever func newNode(ready chan struct{}) { // Create & start node - privValidatorFile := config.GetString("priv_validator_file") - privValidator := types.LoadOrGenPrivValidator(privValidatorFile) - node = nm.NewNode(config, privValidator) + node = nm.NewNodeDefault(config) protocol, address := nm.ProtocolAndAddress(config.GetString("node_laddr")) l := p2p.NewDefaultListener(protocol, address, true) node.AddListener(l) From bfa690b6f71e65ad3973a15c38b457348974b234 Mon Sep 17 00:00:00 2001 From: Ethan Buchman Date: Sat, 10 Sep 2016 15:16:23 -0400 Subject: [PATCH 3/6] config: reduce timeouts during test --- config/tendermint_test/config.go | 14 +++++++------- node/node.go | 1 + 2 files changed, 8 insertions(+), 7 deletions(-) diff --git a/config/tendermint_test/config.go b/config/tendermint_test/config.go index 687720be2..120079858 100644 --- a/config/tendermint_test/config.go +++ b/config/tendermint_test/config.go @@ -87,13 +87,13 @@ func ResetConfig(localPath string) cfg.Config { mapConfig.SetDefault("block_size", 10000) mapConfig.SetDefault("disable_data_hash", false) - mapConfig.SetDefault("timeout_propose", 3000) - mapConfig.SetDefault("timeout_propose_delta", 1000) - mapConfig.SetDefault("timeout_prevote", 2000) - mapConfig.SetDefault("timeout_prevote_delta", 1000) - mapConfig.SetDefault("timeout_precommit", 2000) - mapConfig.SetDefault("timeout_precommit_delta", 1000) - mapConfig.SetDefault("timeout_commit", 1000) + mapConfig.SetDefault("timeout_propose", 2000) + mapConfig.SetDefault("timeout_propose_delta", 500) + mapConfig.SetDefault("timeout_prevote", 1000) + mapConfig.SetDefault("timeout_prevote_delta", 500) + mapConfig.SetDefault("timeout_precommit", 1000) + mapConfig.SetDefault("timeout_precommit_delta", 500) + mapConfig.SetDefault("timeout_commit", 100) mapConfig.SetDefault("mempool_recheck", true) mapConfig.SetDefault("mempool_recheck_empty", true) mapConfig.SetDefault("mempool_broadcast", true) diff --git a/node/node.go b/node/node.go index 969d4b612..220bf81b9 100644 --- a/node/node.go +++ b/node/node.go @@ -123,6 +123,7 @@ func NewNode(config cfg.Config, privValidator *types.PrivValidator, newTMSPClien // if the query return code is OK, add peer // XXX: query format subject to change if config.GetBool("filter_peers") { + // NOTE: addr is ip:port sw.SetAddrFilter(func(addr net.Addr) error { res := proxyApp.Query().QuerySync([]byte(Fmt("p2p/filter/addr/%s", addr.String()))) if res.IsOK() { From 41918d619c5ccdafb6d57e5c7443ea78e4381a1c Mon Sep 17 00:00:00 2001 From: Ethan Buchman Date: Mon, 22 Aug 2016 16:00:48 -0400 Subject: [PATCH 4/6] expose query and info through rpc --- node/node.go | 1 + rpc/core/pipe.go | 6 +++ rpc/core/routes.go | 19 +++++++++ rpc/core/tmsp.go | 17 ++++++++ rpc/core/types/responses.go | 14 +++++++ rpc/test/client_test.go | 81 ++++++++++++++++++++++++++++++++----- 6 files changed, 128 insertions(+), 10 deletions(-) create mode 100644 rpc/core/tmsp.go diff --git a/node/node.go b/node/node.go index 220bf81b9..2c3ae565a 100644 --- a/node/node.go +++ b/node/node.go @@ -209,6 +209,7 @@ func (n *Node) StartRPC() ([]net.Listener, error) { rpccore.SetSwitch(n.sw) rpccore.SetPrivValidator(n.privValidator) rpccore.SetGenesisDoc(n.genesisDoc) + rpccore.SetProxyAppQuery(n.proxyApp.Query()) listenAddrs := strings.Split(n.config.GetString("rpc_laddr"), ",") diff --git a/rpc/core/pipe.go b/rpc/core/pipe.go index 464f7fda9..90febf0b0 100644 --- a/rpc/core/pipe.go +++ b/rpc/core/pipe.go @@ -8,6 +8,7 @@ import ( bc "github.com/tendermint/tendermint/blockchain" "github.com/tendermint/tendermint/consensus" mempl "github.com/tendermint/tendermint/mempool" + "github.com/tendermint/tendermint/proxy" "github.com/tendermint/tendermint/types" ) @@ -19,6 +20,7 @@ var mempoolReactor *mempl.MempoolReactor var p2pSwitch *p2p.Switch var privValidator *types.PrivValidator var genDoc *types.GenesisDoc // cache the genesis structure +var proxyAppQuery proxy.AppConnQuery var config cfg.Config = nil @@ -57,3 +59,7 @@ func SetPrivValidator(pv *types.PrivValidator) { func SetGenesisDoc(doc *types.GenesisDoc) { genDoc = doc } + +func SetProxyAppQuery(appConn proxy.AppConnQuery) { + proxyAppQuery = appConn +} diff --git a/rpc/core/routes.go b/rpc/core/routes.go index 78a0b6187..97c013ab7 100644 --- a/rpc/core/routes.go +++ b/rpc/core/routes.go @@ -25,6 +25,9 @@ var Routes = map[string]*rpc.RPCFunc{ "unconfirmed_txs": rpc.NewRPCFunc(UnconfirmedTxsResult, ""), "num_unconfirmed_txs": rpc.NewRPCFunc(NumUnconfirmedTxsResult, ""), + "tmsp_query": rpc.NewRPCFunc(TMSPQueryResult, "query"), + "tmsp_info": rpc.NewRPCFunc(TMSPInfoResult, ""), + "unsafe_flush_mempool": rpc.NewRPCFunc(UnsafeFlushMempool, ""), "unsafe_set_config": rpc.NewRPCFunc(UnsafeSetConfigResult, "type,key,value"), "unsafe_start_cpu_profiler": rpc.NewRPCFunc(UnsafeStartCPUProfilerResult, "filename"), @@ -152,6 +155,22 @@ func BroadcastTxAsyncResult(tx []byte) (ctypes.TMResult, error) { } } +func TMSPQueryResult(query []byte) (ctypes.TMResult, error) { + if r, err := TMSPQuery(query); err != nil { + return nil, err + } else { + return r, nil + } +} + +func TMSPInfoResult() (ctypes.TMResult, error) { + if r, err := TMSPInfo(); err != nil { + return nil, err + } else { + return r, nil + } +} + func UnsafeFlushMempoolResult() (ctypes.TMResult, error) { if r, err := UnsafeFlushMempool(); err != nil { return nil, err diff --git a/rpc/core/tmsp.go b/rpc/core/tmsp.go new file mode 100644 index 000000000..9a19e6eeb --- /dev/null +++ b/rpc/core/tmsp.go @@ -0,0 +1,17 @@ +package core + +import ( + ctypes "github.com/tendermint/tendermint/rpc/core/types" +) + +//----------------------------------------------------------------------------- + +func TMSPQuery(query []byte) (*ctypes.ResultTMSPQuery, error) { + res := proxyAppQuery.QuerySync(query) + return &ctypes.ResultTMSPQuery{res}, nil +} + +func TMSPInfo() (*ctypes.ResultTMSPInfo, error) { + res := proxyAppQuery.InfoSync() + return &ctypes.ResultTMSPInfo{res}, nil +} diff --git a/rpc/core/types/responses.go b/rpc/core/types/responses.go index c1eebb6e1..cd68addd5 100644 --- a/rpc/core/types/responses.go +++ b/rpc/core/types/responses.go @@ -68,6 +68,14 @@ type ResultUnconfirmedTxs struct { Txs []types.Tx `json:"txs"` } +type ResultTMSPInfo struct { + Result tmsp.Result `json:"result"` +} + +type ResultTMSPQuery struct { + Result tmsp.Result `json:"result"` +} + type ResultUnsafeFlushMempool struct{} type ResultUnsafeSetConfig struct{} @@ -107,6 +115,10 @@ const ( ResultTypeBroadcastTx = byte(0x60) ResultTypeUnconfirmedTxs = byte(0x61) + // 0x7 bytes are for querying the application + ResultTypeTMSPQuery = byte(0x70) + ResultTypeTMSPInfo = byte(0x71) + // 0x8 bytes are for events ResultTypeSubscribe = byte(0x80) ResultTypeUnsubscribe = byte(0x81) @@ -145,4 +157,6 @@ var _ = wire.RegisterInterface( wire.ConcreteType{&ResultUnsafeProfile{}, ResultTypeUnsafeStopCPUProfiler}, wire.ConcreteType{&ResultUnsafeProfile{}, ResultTypeUnsafeWriteHeapProfile}, wire.ConcreteType{&ResultUnsafeFlushMempool{}, ResultTypeUnsafeFlushMempool}, + wire.ConcreteType{&ResultTMSPQuery{}, ResultTypeTMSPQuery}, + wire.ConcreteType{&ResultTMSPInfo{}, ResultTypeTMSPInfo}, ) diff --git a/rpc/test/client_test.go b/rpc/test/client_test.go index 47d2ead10..345049d5a 100644 --- a/rpc/test/client_test.go +++ b/rpc/test/client_test.go @@ -2,9 +2,12 @@ package rpctest import ( "bytes" - "crypto/rand" + crand "crypto/rand" "fmt" + "math/rand" + "strings" "testing" + "time" . "github.com/tendermint/go-common" ctypes "github.com/tendermint/tendermint/rpc/core/types" @@ -14,6 +17,7 @@ import ( //-------------------------------------------------------------------------------- // Test the HTTP client +// These tests assume the dummy app //-------------------------------------------------------------------------------- //-------------------------------------------------------------------------------- @@ -49,20 +53,22 @@ func testStatus(t *testing.T, statusI interface{}) { //-------------------------------------------------------------------------------- // broadcast tx sync -func testTx() []byte { - buf := make([]byte, 16) - _, err := rand.Read(buf) +// random bytes (excluding byte('=')) +func randBytes() []byte { + n := rand.Intn(10) + 2 + buf := make([]byte, n) + _, err := crand.Read(buf) if err != nil { panic(err) } - return buf + return bytes.Replace(buf, []byte("="), []byte{100}, -1) } func TestURIBroadcastTxSync(t *testing.T) { config.Set("block_size", 0) defer config.Set("block_size", -1) tmResult := new(ctypes.TMResult) - tx := testTx() + tx := randBytes() _, err := clientURI.Call("broadcast_tx_sync", map[string]interface{}{"tx": tx}, tmResult) if err != nil { panic(err) @@ -74,7 +80,7 @@ func TestJSONBroadcastTxSync(t *testing.T) { config.Set("block_size", 0) defer config.Set("block_size", -1) tmResult := new(ctypes.TMResult) - tx := testTx() + tx := randBytes() _, err := clientJSON.Call("broadcast_tx_sync", []interface{}{tx}, tmResult) if err != nil { panic(err) @@ -95,18 +101,73 @@ func testBroadcastTxSync(t *testing.T, resI interface{}, tx []byte) { txs := mem.Reap(1) if !bytes.Equal(txs[0], tx) { - panic(Fmt("Tx in mempool does not match test tx. Got %X, expected %X", txs[0], testTx)) + panic(Fmt("Tx in mempool does not match test tx. Got %X, expected %X", txs[0], tx)) } mem.Flush() } +//-------------------------------------------------------------------------------- +// query + +func testTxKV() ([]byte, []byte, []byte) { + k := randBytes() + v := randBytes() + return k, v, []byte(Fmt("%s=%s", k, v)) +} + +func sendTx() ([]byte, []byte) { + tmResult := new(ctypes.TMResult) + k, v, tx := testTxKV() + _, err := clientJSON.Call("broadcast_tx_commit", []interface{}{tx}, tmResult) + if err != nil { + panic(err) + } + fmt.Println("SENT TX", tx) + fmt.Printf("SENT TX %X\n", tx) + fmt.Printf("k %X; v %X", k, v) + return k, v +} + +func TestURITMSPQuery(t *testing.T) { + k, v := sendTx() + time.Sleep(time.Second) + tmResult := new(ctypes.TMResult) + _, err := clientURI.Call("tmsp_query", map[string]interface{}{"query": Fmt("%X", k)}, tmResult) + if err != nil { + panic(err) + } + testTMSPQuery(t, tmResult, v) +} + +func TestJSONTMSPQuery(t *testing.T) { + k, v := sendTx() + tmResult := new(ctypes.TMResult) + _, err := clientJSON.Call("tmsp_query", []interface{}{Fmt("%X", k)}, tmResult) + if err != nil { + panic(err) + } + testTMSPQuery(t, tmResult, v) +} + +func testTMSPQuery(t *testing.T, statusI interface{}, value []byte) { + tmRes := statusI.(*ctypes.TMResult) + query := (*tmRes).(*ctypes.ResultTMSPQuery) + if query.Result.IsErr() { + panic(Fmt("Query returned an err: %v", query)) + } + // XXX: specific to value returned by the dummy + if !strings.Contains(string(query.Result.Data), "exists=true") { + panic(Fmt("Query error. Expected to find 'exists=true'. Got: %s", query.Result.Data)) + } +} + //-------------------------------------------------------------------------------- // broadcast tx commit func TestURIBroadcastTxCommit(t *testing.T) { tmResult := new(ctypes.TMResult) - tx := testTx() + tx := randBytes() _, err := clientURI.Call("broadcast_tx_commit", map[string]interface{}{"tx": tx}, tmResult) if err != nil { panic(err) @@ -116,7 +177,7 @@ func TestURIBroadcastTxCommit(t *testing.T) { func TestJSONBroadcastTxCommit(t *testing.T) { tmResult := new(ctypes.TMResult) - tx := testTx() + tx := randBytes() _, err := clientJSON.Call("broadcast_tx_commit", []interface{}{tx}, tmResult) if err != nil { panic(err) From caeda30b72637561793c5654e8bb26e0a7b1a035 Mon Sep 17 00:00:00 2001 From: Ethan Buchman Date: Sat, 10 Sep 2016 17:14:55 -0400 Subject: [PATCH 5/6] proxy: wrap NewTMSPClient in ClientCreator --- node/node.go | 10 ++--- proxy/app_conn_test.go | 9 +++-- proxy/client.go | 87 +++++++++++++++++++++++++++++------------ proxy/multi_app_conn.go | 19 ++++----- 4 files changed, 81 insertions(+), 44 deletions(-) diff --git a/node/node.go b/node/node.go index 2c3ae565a..72e08b8b8 100644 --- a/node/node.go +++ b/node/node.go @@ -48,10 +48,10 @@ func NewNodeDefault(config cfg.Config) *Node { // Get PrivValidator privValidatorFile := config.GetString("priv_validator_file") privValidator := types.LoadOrGenPrivValidator(privValidatorFile) - return NewNode(config, privValidator, proxy.NewTMSPClientDefault) + return NewNode(config, privValidator, proxy.DefaultClientCreator(config)) } -func NewNode(config cfg.Config, privValidator *types.PrivValidator, newTMSPClient proxy.NewTMSPClient) *Node { +func NewNode(config cfg.Config, privValidator *types.PrivValidator, clientCreator proxy.ClientCreator) *Node { EnsureDir(config.GetString("db_dir"), 0700) // incase we use memdb, cswal still gets written here @@ -67,7 +67,7 @@ func NewNode(config cfg.Config, privValidator *types.PrivValidator, newTMSPClien // Create the proxyApp, which houses three connections: // query, consensus, and mempool - proxyApp := proxy.NewAppConns(config, newTMSPClient, state, blockStore) + proxyApp := proxy.NewAppConns(config, clientCreator, state, blockStore) // add the chainid and number of validators to the global config config.Set("chain_id", state.ChainID) @@ -313,7 +313,7 @@ func getState(config cfg.Config, stateDB dbm.DB) *sm.State { // * supply an in-proc tmsp app // should fork tendermint/tendermint and implement RunNode to // call NewNode with their custom priv validator and/or custom -// proxy.NewTMSPClient function. +// proxy.ClientCreator interface func RunNode(config cfg.Config) { // Wait until the genesis doc becomes available genDocFile := config.GetString("genesis_file") @@ -392,7 +392,7 @@ func newConsensusState(config cfg.Config) *consensus.ConsensusState { // Create two proxyAppConn connections, // one for the consensus and one for the mempool. - proxyApp := proxy.NewAppConns(config, proxy.NewTMSPClientDefault, state, blockStore) + proxyApp := proxy.NewAppConns(config, proxy.DefaultClientCreator(config), state, blockStore) // add the chainid to the global config config.Set("chain_id", state.ChainID) diff --git a/proxy/app_conn_test.go b/proxy/app_conn_test.go index 7e425d55e..3b7a3413e 100644 --- a/proxy/app_conn_test.go +++ b/proxy/app_conn_test.go @@ -45,6 +45,7 @@ var SOCKET = "socket" func TestEcho(t *testing.T) { sockPath := Fmt("unix:///tmp/echo_%v.sock", RandStr(6)) + clientCreator := NewRemoteClientCreator(sockPath, SOCKET, true) // Start server s, err := server.NewSocketServer(sockPath, dummy.NewDummyApplication()) @@ -53,7 +54,7 @@ func TestEcho(t *testing.T) { } defer s.Stop() // Start client - cli, err := NewTMSPClientDefault(sockPath, SOCKET) + cli, err := clientCreator.NewTMSPClient() if err != nil { Exit(err.Error()) } @@ -69,6 +70,7 @@ func TestEcho(t *testing.T) { func BenchmarkEcho(b *testing.B) { b.StopTimer() // Initialize sockPath := Fmt("unix:///tmp/echo_%v.sock", RandStr(6)) + clientCreator := NewRemoteClientCreator(sockPath, SOCKET, true) // Start server s, err := server.NewSocketServer(sockPath, dummy.NewDummyApplication()) if err != nil { @@ -76,7 +78,7 @@ func BenchmarkEcho(b *testing.B) { } defer s.Stop() // Start client - cli, err := NewTMSPClientDefault(sockPath, SOCKET) + cli, err := clientCreator.NewTMSPClient() if err != nil { Exit(err.Error()) } @@ -97,6 +99,7 @@ func BenchmarkEcho(b *testing.B) { func TestInfo(t *testing.T) { sockPath := Fmt("unix:///tmp/echo_%v.sock", RandStr(6)) + clientCreator := NewRemoteClientCreator(sockPath, SOCKET, true) // Start server s, err := server.NewSocketServer(sockPath, dummy.NewDummyApplication()) if err != nil { @@ -104,7 +107,7 @@ func TestInfo(t *testing.T) { } defer s.Stop() // Start client - cli, err := NewTMSPClientDefault(sockPath, SOCKET) + cli, err := clientCreator.NewTMSPClient() if err != nil { Exit(err.Error()) } diff --git a/proxy/client.go b/proxy/client.go index 17e742390..5e40f8569 100644 --- a/proxy/client.go +++ b/proxy/client.go @@ -4,40 +4,77 @@ import ( "fmt" "sync" + cfg "github.com/tendermint/go-config" tmspcli "github.com/tendermint/tmsp/client" "github.com/tendermint/tmsp/example/dummy" nilapp "github.com/tendermint/tmsp/example/nil" + "github.com/tendermint/tmsp/types" ) -// Function type to get a connected tmsp client -// Allows consumers to provide their own in-proc apps, -// or to implement alternate address schemes and transports -type NewTMSPClient func(addr, transport string) (tmspcli.Client, error) +// NewTMSPClient returns newly connected client +type ClientCreator interface { + NewTMSPClient() (tmspcli.Client, error) +} + +//---------------------------------------------------- +// local proxy uses a mutex on an in-proc app + +type localClientCreator struct { + mtx *sync.Mutex + app types.Application +} + +func NewLocalClientCreator(app types.Application) ClientCreator { + return &localClientCreator{ + mtx: new(sync.Mutex), + app: app, + } +} + +func (l *localClientCreator) NewTMSPClient() (tmspcli.Client, error) { + return tmspcli.NewLocalClient(l.mtx, l.app), nil +} + +//--------------------------------------------------------------- +// remote proxy opens new connections to an external app process + +type remoteClientCreator struct { + addr string + transport string + mustConnect bool +} + +func NewRemoteClientCreator(addr, transport string, mustConnect bool) ClientCreator { + return &remoteClientCreator{ + addr: addr, + transport: transport, + mustConnect: mustConnect, + } +} + +func (r *remoteClientCreator) NewTMSPClient() (tmspcli.Client, error) { + // Run forever in a loop + remoteApp, err := tmspcli.NewClient(r.addr, r.transport, r.mustConnect) + if err != nil { + return nil, fmt.Errorf("Failed to connect to proxy: %v", err) + } + return remoteApp, nil +} + +//----------------------------------------------------------------- +// default -// Get a connected tmsp client. -// Offers some default in-proc apps, else socket/grpc. -func NewTMSPClientDefault(addr, transport string) (tmspcli.Client, error) { - var client tmspcli.Client +func DefaultClientCreator(config cfg.Config) ClientCreator { + addr := config.GetString("proxy_app") + transport := config.GetString("tmsp") - // use local app (for testing) - // TODO: local proxy app conn switch addr { - case "nilapp": - app := nilapp.NewNilApplication() - mtx := new(sync.Mutex) // TODO - client = tmspcli.NewLocalClient(mtx, app) case "dummy": - app := dummy.NewDummyApplication() - mtx := new(sync.Mutex) // TODO - client = tmspcli.NewLocalClient(mtx, app) + return NewLocalClientCreator(dummy.NewDummyApplication()) + case "nil": + return NewLocalClientCreator(nilapp.NewNilApplication()) default: - // Run forever in a loop - mustConnect := false - remoteApp, err := tmspcli.NewClient(addr, transport, mustConnect) - if err != nil { - return nil, fmt.Errorf("Failed to connect to proxy for mempool: %v", err) - } - client = remoteApp + mustConnect := true + return NewRemoteClientCreator(addr, transport, mustConnect) } - return client, nil } diff --git a/proxy/multi_app_conn.go b/proxy/multi_app_conn.go index 5c16678f9..fe009e1d6 100644 --- a/proxy/multi_app_conn.go +++ b/proxy/multi_app_conn.go @@ -12,8 +12,8 @@ type AppConns interface { Query() AppConnQuery } -func NewAppConns(config cfg.Config, newTMSPClient NewTMSPClient, state State, blockStore BlockStore) AppConns { - return NewMultiAppConn(config, newTMSPClient, state, blockStore) +func NewAppConns(config cfg.Config, clientCreator ClientCreator, state State, blockStore BlockStore) AppConns { + return NewMultiAppConn(config, clientCreator, state, blockStore) } // a multiAppConn is made of a few appConns (mempool, consensus, query) @@ -30,16 +30,16 @@ type multiAppConn struct { consensusConn *appConnConsensus queryConn *appConnQuery - newTMSPClient NewTMSPClient + clientCreator ClientCreator } // Make all necessary tmsp connections to the application -func NewMultiAppConn(config cfg.Config, newTMSPClient NewTMSPClient, state State, blockStore BlockStore) *multiAppConn { +func NewMultiAppConn(config cfg.Config, clientCreator ClientCreator, state State, blockStore BlockStore) *multiAppConn { multiAppConn := &multiAppConn{ config: config, state: state, blockStore: blockStore, - newTMSPClient: newTMSPClient, + clientCreator: clientCreator, } multiAppConn.QuitService = *NewQuitService(log, "multiAppConn", multiAppConn) multiAppConn.Start() @@ -63,25 +63,22 @@ func (app *multiAppConn) Query() AppConnQuery { func (app *multiAppConn) OnStart() error { app.QuitService.OnStart() - addr := app.config.GetString("proxy_app") - transport := app.config.GetString("tmsp") - // query connection - querycli, err := app.newTMSPClient(addr, transport) + querycli, err := app.clientCreator.NewTMSPClient() if err != nil { return err } app.queryConn = NewAppConnQuery(querycli) // mempool connection - memcli, err := app.newTMSPClient(addr, transport) + memcli, err := app.clientCreator.NewTMSPClient() if err != nil { return err } app.mempoolConn = NewAppConnMempool(memcli) // consensus connection - concli, err := app.newTMSPClient(addr, transport) + concli, err := app.clientCreator.NewTMSPClient() if err != nil { return err } From 28ec26462ad8968d5512f4bcc772c9437a751887 Mon Sep 17 00:00:00 2001 From: Ethan Buchman Date: Sat, 10 Sep 2016 17:42:12 -0400 Subject: [PATCH 6/6] test: test dummy using rpc query --- test/app/dummy_test.sh | 37 ++++++++++++++++++++++++++++++++++++- 1 file changed, 36 insertions(+), 1 deletion(-) diff --git a/test/app/dummy_test.sh b/test/app/dummy_test.sh index d294e56f8..bb17c6c31 100644 --- a/test/app/dummy_test.sh +++ b/test/app/dummy_test.sh @@ -13,10 +13,15 @@ TESTNAME=$1 # store key value pair KEY="abcd" VALUE="dcba" -curl 127.0.0.1:46657/broadcast_tx_commit?tx=\"$(toHex $KEY=$VALUE)\" +curl -s 127.0.0.1:46657/broadcast_tx_commit?tx=\"$(toHex $KEY=$VALUE)\" echo $? echo "" + +########################### +# test using the tmsp-cli +########################### + # we should be able to look up the key RESPONSE=`tmsp-cli query $KEY` @@ -40,4 +45,34 @@ if [[ $? == 0 ]]; then fi set -e +############################# +# test using the /tmsp_query +############################# + +# we should be able to look up the key +RESPONSE=`curl -s 127.0.0.1:46657/tmsp_query?query=\"$(toHex $KEY)\"` +RESPONSE=`echo $RESPONSE | jq .result[1].result.Data | xxd -r -p` + +set +e +A=`echo $RESPONSE | grep exists=true` +if [[ $? != 0 ]]; then + echo "Failed to find 'exists=true' for $KEY. Response:" + echo "$RESPONSE" + exit 1 +fi +set -e + +# we should not be able to look up the value +RESPONSE=`curl -s 127.0.0.1:46657/tmsp_query?query=\"$(toHex $VALUE)\"` +RESPONSE=`echo $RESPONSE | jq .result[1].result.Data | xxd -r -p` +set +e +A=`echo $RESPONSE | grep exists=true` +if [[ $? == 0 ]]; then + echo "Found 'exists=true' for $VALUE when we should not have. Response:" + echo "$RESPONSE" + exit 1 +fi +set -e + + echo "Passed Test: $TESTNAME"