diff --git a/cmd/tendermint/flags.go b/cmd/tendermint/flags.go index 029e65ab7..1765cc545 100644 --- a/cmd/tendermint/flags.go +++ b/cmd/tendermint/flags.go @@ -9,15 +9,16 @@ import ( func parseFlags(config cfg.Config, args []string) { var ( - printHelp bool - moniker string - nodeLaddr string - seeds string - fastSync bool - skipUPNP bool - rpcLaddr string - logLevel string - proxyApp string + printHelp bool + moniker string + nodeLaddr string + seeds string + fastSync bool + skipUPNP bool + rpcLaddr string + logLevel string + proxyApp string + tmspTransport string ) // Declare flags @@ -32,6 +33,7 @@ func parseFlags(config cfg.Config, args []string) { flags.StringVar(&logLevel, "log_level", config.GetString("log_level"), "Log level") flags.StringVar(&proxyApp, "proxy_app", config.GetString("proxy_app"), "Proxy app address, or 'nilapp' or 'dummy' for local testing.") + flags.StringVar(&tmspTransport, "tmsp", config.GetString("tmsp"), "Specify tmsp transport (socket | grpc)") flags.Parse(args) if printHelp { flags.PrintDefaults() @@ -47,4 +49,5 @@ func parseFlags(config cfg.Config, args []string) { config.Set("rpc_laddr", rpcLaddr) config.Set("log_level", logLevel) config.Set("proxy_app", proxyApp) + config.Set("tmsp", tmspTransport) } diff --git a/config/tendermint/config.go b/config/tendermint/config.go index 958fd5a44..3731c53bb 100644 --- a/config/tendermint/config.go +++ b/config/tendermint/config.go @@ -53,6 +53,7 @@ func GetConfig(rootDir string) cfg.Config { mapConfig.SetRequired("chain_id") // blows up if you try to use it before setting. mapConfig.SetDefault("genesis_file", rootDir+"/genesis.json") mapConfig.SetDefault("proxy_app", "tcp://127.0.0.1:46658") + mapConfig.SetDefault("tmsp", "socket") mapConfig.SetDefault("moniker", "anonymous") mapConfig.SetDefault("node_laddr", "0.0.0.0:46656") mapConfig.SetDefault("seeds", "") diff --git a/config/tendermint_test/config.go b/config/tendermint_test/config.go index 293185f7e..689ba794c 100644 --- a/config/tendermint_test/config.go +++ b/config/tendermint_test/config.go @@ -68,6 +68,7 @@ func ResetConfig(localPath string) cfg.Config { mapConfig.SetDefault("chain_id", "tendermint_test") mapConfig.SetDefault("genesis_file", rootDir+"/genesis.json") mapConfig.SetDefault("proxy_app", "dummy") + mapConfig.SetDefault("tmsp", "socket") mapConfig.SetDefault("moniker", "anonymous") mapConfig.SetDefault("node_laddr", "0.0.0.0:36656") mapConfig.SetDefault("fast_sync", false) diff --git a/mempool/mempool.go b/mempool/mempool.go index 638e17013..87a185ef2 100644 --- a/mempool/mempool.go +++ b/mempool/mempool.go @@ -112,8 +112,12 @@ func (mem *Mempool) CheckTx(tx types.Tx, cb func(*tmsp.Response)) (err error) { if _, exists := mem.cacheMap[string(tx)]; exists { if cb != nil { cb(&tmsp.Response{ - Code: tmsp.CodeType_BadNonce, // TODO or duplicate tx - Log: "Duplicate transaction (ignored)", + Value: &tmsp.Response_CheckTx{ + &tmsp.ResponseCheckTx{ + Code: tmsp.CodeType_BadNonce, // TODO or duplicate tx + Log: "Duplicate transaction (ignored)", + }, + }, }) } return nil @@ -150,18 +154,18 @@ func (mem *Mempool) resCb(req *tmsp.Request, res *tmsp.Response) { } func (mem *Mempool) resCbNormal(req *tmsp.Request, res *tmsp.Response) { - switch res.Type { - case tmsp.MessageType_CheckTx: - if res.Code == tmsp.CodeType_OK { + switch r := res.Value.(type) { + case *tmsp.Response_CheckTx: + if r.CheckTx.Code == tmsp.CodeType_OK { mem.counter++ memTx := &mempoolTx{ counter: mem.counter, height: int64(mem.height), - tx: req.Data, + tx: req.GetCheckTx().Tx, } mem.txs.PushBack(memTx) } else { - log.Info("Bad Transaction", "res", res) + log.Info("Bad Transaction", "res", r) // ignore bad transaction // TODO: handle other retcodes } @@ -171,14 +175,14 @@ func (mem *Mempool) resCbNormal(req *tmsp.Request, res *tmsp.Response) { } func (mem *Mempool) resCbRecheck(req *tmsp.Request, res *tmsp.Response) { - switch res.Type { - case tmsp.MessageType_CheckTx: + switch r := res.Value.(type) { + case *tmsp.Response_CheckTx: memTx := mem.recheckCursor.Value.(*mempoolTx) - if !bytes.Equal(req.Data, memTx.tx) { + if !bytes.Equal(req.GetCheckTx().Tx, memTx.tx) { PanicSanity(Fmt("Unexpected tx response from proxy during recheck\n"+ - "Expected %X, got %X", req.Data, memTx.tx)) + "Expected %X, got %X", r.CheckTx.Data, memTx.tx)) } - if res.Code == tmsp.CodeType_OK { + if r.CheckTx.Code == tmsp.CodeType_OK { // Good, nothing to do. } else { // Tx became invalidated due to newly committed block. diff --git a/node/node.go b/node/node.go index d78f2d7b4..e47de0fdf 100644 --- a/node/node.go +++ b/node/node.go @@ -47,7 +47,7 @@ type Node struct { privKey crypto.PrivKeyEd25519 } -func NewNode(config cfg.Config, privValidator *types.PrivValidator, getProxyApp func(proxyAddr string, appHash []byte) proxy.AppConn) *Node { +func NewNode(config cfg.Config, privValidator *types.PrivValidator, getProxyApp func(proxyAddr, transport string, appHash []byte) proxy.AppConn) *Node { EnsureDir(config.GetString("db_dir"), 0700) // incase we use memdb, cswal still gets written here @@ -64,8 +64,9 @@ func NewNode(config cfg.Config, privValidator *types.PrivValidator, getProxyApp // Create two proxyAppConn connections, // one for the consensus and one for the mempool. proxyAddr := config.GetString("proxy_app") - proxyAppConnMempool := getProxyApp(proxyAddr, state.AppHash) - proxyAppConnConsensus := getProxyApp(proxyAddr, state.AppHash) + transport := config.GetString("tmsp") + proxyAppConnMempool := getProxyApp(proxyAddr, transport, state.AppHash) + proxyAppConnConsensus := getProxyApp(proxyAddr, transport, state.AppHash) // add the chainid and number of validators to the global config config.Set("chain_id", state.ChainID) @@ -268,7 +269,7 @@ func makeNodeInfo(config cfg.Config, sw *p2p.Switch, privKey crypto.PrivKeyEd255 // Get a connection to the proxyAppConn addr. // Check the current hash, and panic if it doesn't match. -func GetProxyApp(addr string, hash []byte) (proxyAppConn proxy.AppConn) { +func GetProxyApp(addr, transport string, hash []byte) (proxyAppConn proxy.AppConn) { // use local app (for testing) switch addr { case "nilapp": @@ -281,7 +282,7 @@ func GetProxyApp(addr string, hash []byte) (proxyAppConn proxy.AppConn) { proxyAppConn = tmspcli.NewLocalClient(mtx, app) default: // Run forever in a loop - remoteApp, err := proxy.NewRemoteAppConn(addr) + remoteApp, err := proxy.NewRemoteAppConn(addr, transport) if err != nil { Exit(Fmt("Failed to connect to proxy for mempool: %v", err)) } @@ -397,8 +398,9 @@ func newConsensusState(config cfg.Config) *consensus.ConsensusState { // Create two proxyAppConn connections, // one for the consensus and one for the mempool. proxyAddr := config.GetString("proxy_app") - proxyAppConnMempool := GetProxyApp(proxyAddr, state.AppHash) - proxyAppConnConsensus := GetProxyApp(proxyAddr, state.AppHash) + transport := config.GetString("tmsp") + proxyAppConnMempool := GetProxyApp(proxyAddr, transport, state.AppHash) + proxyAppConnConsensus := GetProxyApp(proxyAddr, transport, state.AppHash) // add the chainid to the global config config.Set("chain_id", state.ChainID) diff --git a/proxy/remote_app_conn.go b/proxy/remote_app_conn.go index a593c4a8e..9076bb450 100644 --- a/proxy/remote_app_conn.go +++ b/proxy/remote_app_conn.go @@ -11,8 +11,8 @@ type remoteAppConn struct { tmspcli.Client } -func NewRemoteAppConn(addr string) (*remoteAppConn, error) { - client, err := tmspcli.NewClient(addr, false) +func NewRemoteAppConn(addr, transport string) (*remoteAppConn, error) { + client, err := tmspcli.NewClient(addr, transport, false) if err != nil { return nil, err } diff --git a/proxy/remote_app_conn_test.go b/proxy/remote_app_conn_test.go index ab42b9943..58ad50f19 100644 --- a/proxy/remote_app_conn_test.go +++ b/proxy/remote_app_conn_test.go @@ -9,17 +9,19 @@ import ( "github.com/tendermint/tmsp/server" ) +var SOCKET = "socket" + func TestEcho(t *testing.T) { sockPath := Fmt("unix:///tmp/echo_%v.sock", RandStr(6)) // Start server - s, err := server.NewServer(sockPath, dummy.NewDummyApplication()) + s, err := server.NewSocketServer(sockPath, dummy.NewDummyApplication()) if err != nil { Exit(err.Error()) } defer s.Stop() // Start client - proxy, err := NewRemoteAppConn(sockPath) + proxy, err := NewRemoteAppConn(sockPath, SOCKET) if err != nil { Exit(err.Error()) } else { @@ -36,13 +38,13 @@ func BenchmarkEcho(b *testing.B) { b.StopTimer() // Initialize sockPath := Fmt("unix:///tmp/echo_%v.sock", RandStr(6)) // Start server - s, err := server.NewServer(sockPath, dummy.NewDummyApplication()) + s, err := server.NewSocketServer(sockPath, dummy.NewDummyApplication()) if err != nil { Exit(err.Error()) } defer s.Stop() // Start client - proxy, err := NewRemoteAppConn(sockPath) + proxy, err := NewRemoteAppConn(sockPath, SOCKET) if err != nil { Exit(err.Error()) } else { @@ -64,13 +66,13 @@ func BenchmarkEcho(b *testing.B) { func TestInfo(t *testing.T) { sockPath := Fmt("unix:///tmp/echo_%v.sock", RandStr(6)) // Start server - s, err := server.NewServer(sockPath, dummy.NewDummyApplication()) + s, err := server.NewSocketServer(sockPath, dummy.NewDummyApplication()) if err != nil { Exit(err.Error()) } defer s.Stop() // Start client - proxy, err := NewRemoteAppConn(sockPath) + proxy, err := NewRemoteAppConn(sockPath, SOCKET) if err != nil { Exit(err.Error()) } else { diff --git a/rpc/core/mempool.go b/rpc/core/mempool.go index d06fca8b2..b0e5c0c43 100644 --- a/rpc/core/mempool.go +++ b/rpc/core/mempool.go @@ -28,10 +28,11 @@ func BroadcastTxSync(tx types.Tx) (*ctypes.ResultBroadcastTx, error) { return nil, fmt.Errorf("Error broadcasting transaction: %v", err) } res := <-resCh + r := res.GetCheckTx() return &ctypes.ResultBroadcastTx{ - Code: res.Code, - Data: res.Data, - Log: res.Log, + Code: r.Code, + Data: r.Data, + Log: r.Log, }, nil } diff --git a/state/execution.go b/state/execution.go index d2b027c84..230bceb29 100644 --- a/state/execution.go +++ b/state/execution.go @@ -61,16 +61,16 @@ func (s *State) execBlockOnProxyApp(evsw *events.EventSwitch, proxyAppConn proxy // Execute transactions and get hash proxyCb := func(req *tmsp.Request, res *tmsp.Response) { - switch res.Type { - case tmsp.MessageType_AppendTx: + switch r := res.Value.(type) { + case *tmsp.Response_AppendTx: // TODO: make use of res.Log // TODO: make use of this info // Blocks may include invalid txs. // reqAppendTx := req.(tmsp.RequestAppendTx) - if res.Code == tmsp.CodeType_OK { + if r.AppendTx.Code == tmsp.CodeType_OK { validTxs += 1 } else { - log.Debug("Invalid tx", "code", res.Code, "log", res.Log) + log.Debug("Invalid tx", "code", r.AppendTx.Code, "log", r.AppendTx.Log) invalidTxs += 1 } }