Browse Source

Merge pull request #215 from tendermint/new_protobuf

Protobuf + GRPC
pull/220/head
Jae Kwon 9 years ago
committed by GitHub
parent
commit
16987fe143
9 changed files with 57 additions and 43 deletions
  1. +12
    -9
      cmd/tendermint/flags.go
  2. +1
    -0
      config/tendermint/config.go
  3. +1
    -0
      config/tendermint_test/config.go
  4. +16
    -12
      mempool/mempool.go
  5. +9
    -7
      node/node.go
  6. +2
    -2
      proxy/remote_app_conn.go
  7. +8
    -6
      proxy/remote_app_conn_test.go
  8. +4
    -3
      rpc/core/mempool.go
  9. +4
    -4
      state/execution.go

+ 12
- 9
cmd/tendermint/flags.go View File

@ -9,15 +9,16 @@ import (
func parseFlags(config cfg.Config, args []string) { func parseFlags(config cfg.Config, args []string) {
var ( var (
printHelp bool
moniker string
nodeLaddr string
seeds string
fastSync bool
skipUPNP bool
rpcLaddr string
logLevel string
proxyApp string
printHelp bool
moniker string
nodeLaddr string
seeds string
fastSync bool
skipUPNP bool
rpcLaddr string
logLevel string
proxyApp string
tmspTransport string
) )
// Declare flags // Declare flags
@ -32,6 +33,7 @@ func parseFlags(config cfg.Config, args []string) {
flags.StringVar(&logLevel, "log_level", config.GetString("log_level"), "Log level") flags.StringVar(&logLevel, "log_level", config.GetString("log_level"), "Log level")
flags.StringVar(&proxyApp, "proxy_app", config.GetString("proxy_app"), flags.StringVar(&proxyApp, "proxy_app", config.GetString("proxy_app"),
"Proxy app address, or 'nilapp' or 'dummy' for local testing.") "Proxy app address, or 'nilapp' or 'dummy' for local testing.")
flags.StringVar(&tmspTransport, "tmsp", config.GetString("tmsp"), "Specify tmsp transport (socket | grpc)")
flags.Parse(args) flags.Parse(args)
if printHelp { if printHelp {
flags.PrintDefaults() flags.PrintDefaults()
@ -47,4 +49,5 @@ func parseFlags(config cfg.Config, args []string) {
config.Set("rpc_laddr", rpcLaddr) config.Set("rpc_laddr", rpcLaddr)
config.Set("log_level", logLevel) config.Set("log_level", logLevel)
config.Set("proxy_app", proxyApp) config.Set("proxy_app", proxyApp)
config.Set("tmsp", tmspTransport)
} }

+ 1
- 0
config/tendermint/config.go View File

@ -53,6 +53,7 @@ func GetConfig(rootDir string) cfg.Config {
mapConfig.SetRequired("chain_id") // blows up if you try to use it before setting. mapConfig.SetRequired("chain_id") // blows up if you try to use it before setting.
mapConfig.SetDefault("genesis_file", rootDir+"/genesis.json") mapConfig.SetDefault("genesis_file", rootDir+"/genesis.json")
mapConfig.SetDefault("proxy_app", "tcp://127.0.0.1:46658") mapConfig.SetDefault("proxy_app", "tcp://127.0.0.1:46658")
mapConfig.SetDefault("tmsp", "socket")
mapConfig.SetDefault("moniker", "anonymous") mapConfig.SetDefault("moniker", "anonymous")
mapConfig.SetDefault("node_laddr", "0.0.0.0:46656") mapConfig.SetDefault("node_laddr", "0.0.0.0:46656")
mapConfig.SetDefault("seeds", "") mapConfig.SetDefault("seeds", "")


+ 1
- 0
config/tendermint_test/config.go View File

@ -68,6 +68,7 @@ func ResetConfig(localPath string) cfg.Config {
mapConfig.SetDefault("chain_id", "tendermint_test") mapConfig.SetDefault("chain_id", "tendermint_test")
mapConfig.SetDefault("genesis_file", rootDir+"/genesis.json") mapConfig.SetDefault("genesis_file", rootDir+"/genesis.json")
mapConfig.SetDefault("proxy_app", "dummy") mapConfig.SetDefault("proxy_app", "dummy")
mapConfig.SetDefault("tmsp", "socket")
mapConfig.SetDefault("moniker", "anonymous") mapConfig.SetDefault("moniker", "anonymous")
mapConfig.SetDefault("node_laddr", "0.0.0.0:36656") mapConfig.SetDefault("node_laddr", "0.0.0.0:36656")
mapConfig.SetDefault("fast_sync", false) mapConfig.SetDefault("fast_sync", false)


+ 16
- 12
mempool/mempool.go View File

@ -112,8 +112,12 @@ func (mem *Mempool) CheckTx(tx types.Tx, cb func(*tmsp.Response)) (err error) {
if _, exists := mem.cacheMap[string(tx)]; exists { if _, exists := mem.cacheMap[string(tx)]; exists {
if cb != nil { if cb != nil {
cb(&tmsp.Response{ cb(&tmsp.Response{
Code: tmsp.CodeType_BadNonce, // TODO or duplicate tx
Log: "Duplicate transaction (ignored)",
Value: &tmsp.Response_CheckTx{
&tmsp.ResponseCheckTx{
Code: tmsp.CodeType_BadNonce, // TODO or duplicate tx
Log: "Duplicate transaction (ignored)",
},
},
}) })
} }
return nil return nil
@ -150,18 +154,18 @@ func (mem *Mempool) resCb(req *tmsp.Request, res *tmsp.Response) {
} }
func (mem *Mempool) resCbNormal(req *tmsp.Request, res *tmsp.Response) { func (mem *Mempool) resCbNormal(req *tmsp.Request, res *tmsp.Response) {
switch res.Type {
case tmsp.MessageType_CheckTx:
if res.Code == tmsp.CodeType_OK {
switch r := res.Value.(type) {
case *tmsp.Response_CheckTx:
if r.CheckTx.Code == tmsp.CodeType_OK {
mem.counter++ mem.counter++
memTx := &mempoolTx{ memTx := &mempoolTx{
counter: mem.counter, counter: mem.counter,
height: int64(mem.height), height: int64(mem.height),
tx: req.Data,
tx: req.GetCheckTx().Tx,
} }
mem.txs.PushBack(memTx) mem.txs.PushBack(memTx)
} else { } else {
log.Info("Bad Transaction", "res", res)
log.Info("Bad Transaction", "res", r)
// ignore bad transaction // ignore bad transaction
// TODO: handle other retcodes // TODO: handle other retcodes
} }
@ -171,14 +175,14 @@ func (mem *Mempool) resCbNormal(req *tmsp.Request, res *tmsp.Response) {
} }
func (mem *Mempool) resCbRecheck(req *tmsp.Request, res *tmsp.Response) { func (mem *Mempool) resCbRecheck(req *tmsp.Request, res *tmsp.Response) {
switch res.Type {
case tmsp.MessageType_CheckTx:
switch r := res.Value.(type) {
case *tmsp.Response_CheckTx:
memTx := mem.recheckCursor.Value.(*mempoolTx) memTx := mem.recheckCursor.Value.(*mempoolTx)
if !bytes.Equal(req.Data, memTx.tx) {
if !bytes.Equal(req.GetCheckTx().Tx, memTx.tx) {
PanicSanity(Fmt("Unexpected tx response from proxy during recheck\n"+ PanicSanity(Fmt("Unexpected tx response from proxy during recheck\n"+
"Expected %X, got %X", req.Data, memTx.tx))
"Expected %X, got %X", r.CheckTx.Data, memTx.tx))
} }
if res.Code == tmsp.CodeType_OK {
if r.CheckTx.Code == tmsp.CodeType_OK {
// Good, nothing to do. // Good, nothing to do.
} else { } else {
// Tx became invalidated due to newly committed block. // Tx became invalidated due to newly committed block.


+ 9
- 7
node/node.go View File

@ -47,7 +47,7 @@ type Node struct {
privKey crypto.PrivKeyEd25519 privKey crypto.PrivKeyEd25519
} }
func NewNode(config cfg.Config, privValidator *types.PrivValidator, getProxyApp func(proxyAddr string, appHash []byte) proxy.AppConn) *Node {
func NewNode(config cfg.Config, privValidator *types.PrivValidator, getProxyApp func(proxyAddr, transport string, appHash []byte) proxy.AppConn) *Node {
EnsureDir(config.GetString("db_dir"), 0700) // incase we use memdb, cswal still gets written here EnsureDir(config.GetString("db_dir"), 0700) // incase we use memdb, cswal still gets written here
@ -64,8 +64,9 @@ func NewNode(config cfg.Config, privValidator *types.PrivValidator, getProxyApp
// Create two proxyAppConn connections, // Create two proxyAppConn connections,
// one for the consensus and one for the mempool. // one for the consensus and one for the mempool.
proxyAddr := config.GetString("proxy_app") proxyAddr := config.GetString("proxy_app")
proxyAppConnMempool := getProxyApp(proxyAddr, state.AppHash)
proxyAppConnConsensus := getProxyApp(proxyAddr, state.AppHash)
transport := config.GetString("tmsp")
proxyAppConnMempool := getProxyApp(proxyAddr, transport, state.AppHash)
proxyAppConnConsensus := getProxyApp(proxyAddr, transport, state.AppHash)
// add the chainid and number of validators to the global config // add the chainid and number of validators to the global config
config.Set("chain_id", state.ChainID) config.Set("chain_id", state.ChainID)
@ -268,7 +269,7 @@ func makeNodeInfo(config cfg.Config, sw *p2p.Switch, privKey crypto.PrivKeyEd255
// Get a connection to the proxyAppConn addr. // Get a connection to the proxyAppConn addr.
// Check the current hash, and panic if it doesn't match. // Check the current hash, and panic if it doesn't match.
func GetProxyApp(addr string, hash []byte) (proxyAppConn proxy.AppConn) {
func GetProxyApp(addr, transport string, hash []byte) (proxyAppConn proxy.AppConn) {
// use local app (for testing) // use local app (for testing)
switch addr { switch addr {
case "nilapp": case "nilapp":
@ -281,7 +282,7 @@ func GetProxyApp(addr string, hash []byte) (proxyAppConn proxy.AppConn) {
proxyAppConn = tmspcli.NewLocalClient(mtx, app) proxyAppConn = tmspcli.NewLocalClient(mtx, app)
default: default:
// Run forever in a loop // Run forever in a loop
remoteApp, err := proxy.NewRemoteAppConn(addr)
remoteApp, err := proxy.NewRemoteAppConn(addr, transport)
if err != nil { if err != nil {
Exit(Fmt("Failed to connect to proxy for mempool: %v", err)) Exit(Fmt("Failed to connect to proxy for mempool: %v", err))
} }
@ -397,8 +398,9 @@ func newConsensusState(config cfg.Config) *consensus.ConsensusState {
// Create two proxyAppConn connections, // Create two proxyAppConn connections,
// one for the consensus and one for the mempool. // one for the consensus and one for the mempool.
proxyAddr := config.GetString("proxy_app") proxyAddr := config.GetString("proxy_app")
proxyAppConnMempool := GetProxyApp(proxyAddr, state.AppHash)
proxyAppConnConsensus := GetProxyApp(proxyAddr, state.AppHash)
transport := config.GetString("tmsp")
proxyAppConnMempool := GetProxyApp(proxyAddr, transport, state.AppHash)
proxyAppConnConsensus := GetProxyApp(proxyAddr, transport, state.AppHash)
// add the chainid to the global config // add the chainid to the global config
config.Set("chain_id", state.ChainID) config.Set("chain_id", state.ChainID)


+ 2
- 2
proxy/remote_app_conn.go View File

@ -11,8 +11,8 @@ type remoteAppConn struct {
tmspcli.Client tmspcli.Client
} }
func NewRemoteAppConn(addr string) (*remoteAppConn, error) {
client, err := tmspcli.NewClient(addr, false)
func NewRemoteAppConn(addr, transport string) (*remoteAppConn, error) {
client, err := tmspcli.NewClient(addr, transport, false)
if err != nil { if err != nil {
return nil, err return nil, err
} }


+ 8
- 6
proxy/remote_app_conn_test.go View File

@ -9,17 +9,19 @@ import (
"github.com/tendermint/tmsp/server" "github.com/tendermint/tmsp/server"
) )
var SOCKET = "socket"
func TestEcho(t *testing.T) { func TestEcho(t *testing.T) {
sockPath := Fmt("unix:///tmp/echo_%v.sock", RandStr(6)) sockPath := Fmt("unix:///tmp/echo_%v.sock", RandStr(6))
// Start server // Start server
s, err := server.NewServer(sockPath, dummy.NewDummyApplication())
s, err := server.NewSocketServer(sockPath, dummy.NewDummyApplication())
if err != nil { if err != nil {
Exit(err.Error()) Exit(err.Error())
} }
defer s.Stop() defer s.Stop()
// Start client // Start client
proxy, err := NewRemoteAppConn(sockPath)
proxy, err := NewRemoteAppConn(sockPath, SOCKET)
if err != nil { if err != nil {
Exit(err.Error()) Exit(err.Error())
} else { } else {
@ -36,13 +38,13 @@ func BenchmarkEcho(b *testing.B) {
b.StopTimer() // Initialize b.StopTimer() // Initialize
sockPath := Fmt("unix:///tmp/echo_%v.sock", RandStr(6)) sockPath := Fmt("unix:///tmp/echo_%v.sock", RandStr(6))
// Start server // Start server
s, err := server.NewServer(sockPath, dummy.NewDummyApplication())
s, err := server.NewSocketServer(sockPath, dummy.NewDummyApplication())
if err != nil { if err != nil {
Exit(err.Error()) Exit(err.Error())
} }
defer s.Stop() defer s.Stop()
// Start client // Start client
proxy, err := NewRemoteAppConn(sockPath)
proxy, err := NewRemoteAppConn(sockPath, SOCKET)
if err != nil { if err != nil {
Exit(err.Error()) Exit(err.Error())
} else { } else {
@ -64,13 +66,13 @@ func BenchmarkEcho(b *testing.B) {
func TestInfo(t *testing.T) { func TestInfo(t *testing.T) {
sockPath := Fmt("unix:///tmp/echo_%v.sock", RandStr(6)) sockPath := Fmt("unix:///tmp/echo_%v.sock", RandStr(6))
// Start server // Start server
s, err := server.NewServer(sockPath, dummy.NewDummyApplication())
s, err := server.NewSocketServer(sockPath, dummy.NewDummyApplication())
if err != nil { if err != nil {
Exit(err.Error()) Exit(err.Error())
} }
defer s.Stop() defer s.Stop()
// Start client // Start client
proxy, err := NewRemoteAppConn(sockPath)
proxy, err := NewRemoteAppConn(sockPath, SOCKET)
if err != nil { if err != nil {
Exit(err.Error()) Exit(err.Error())
} else { } else {


+ 4
- 3
rpc/core/mempool.go View File

@ -28,10 +28,11 @@ func BroadcastTxSync(tx types.Tx) (*ctypes.ResultBroadcastTx, error) {
return nil, fmt.Errorf("Error broadcasting transaction: %v", err) return nil, fmt.Errorf("Error broadcasting transaction: %v", err)
} }
res := <-resCh res := <-resCh
r := res.GetCheckTx()
return &ctypes.ResultBroadcastTx{ return &ctypes.ResultBroadcastTx{
Code: res.Code,
Data: res.Data,
Log: res.Log,
Code: r.Code,
Data: r.Data,
Log: r.Log,
}, nil }, nil
} }


+ 4
- 4
state/execution.go View File

@ -61,16 +61,16 @@ func (s *State) execBlockOnProxyApp(evsw *events.EventSwitch, proxyAppConn proxy
// Execute transactions and get hash // Execute transactions and get hash
proxyCb := func(req *tmsp.Request, res *tmsp.Response) { proxyCb := func(req *tmsp.Request, res *tmsp.Response) {
switch res.Type {
case tmsp.MessageType_AppendTx:
switch r := res.Value.(type) {
case *tmsp.Response_AppendTx:
// TODO: make use of res.Log // TODO: make use of res.Log
// TODO: make use of this info // TODO: make use of this info
// Blocks may include invalid txs. // Blocks may include invalid txs.
// reqAppendTx := req.(tmsp.RequestAppendTx) // reqAppendTx := req.(tmsp.RequestAppendTx)
if res.Code == tmsp.CodeType_OK {
if r.AppendTx.Code == tmsp.CodeType_OK {
validTxs += 1 validTxs += 1
} else { } else {
log.Debug("Invalid tx", "code", res.Code, "log", res.Log)
log.Debug("Invalid tx", "code", r.AppendTx.Code, "log", r.AppendTx.Log)
invalidTxs += 1 invalidTxs += 1
} }
} }


Loading…
Cancel
Save