Browse Source

Merge pull request #270 from tendermint/query_rpc

Query rpc
pull/274/head
Ethan Buchman 8 years ago
committed by GitHub
parent
commit
a58e8d47c6
14 changed files with 299 additions and 101 deletions
  1. +7
    -7
      config/tendermint_test/config.go
  2. +9
    -12
      consensus/state.go
  3. +18
    -10
      node/node.go
  4. +1
    -6
      node/node_test.go
  5. +6
    -3
      proxy/app_conn_test.go
  6. +80
    -0
      proxy/client.go
  7. +14
    -48
      proxy/multi_app_conn.go
  8. +6
    -0
      rpc/core/pipe.go
  9. +19
    -0
      rpc/core/routes.go
  10. +17
    -0
      rpc/core/tmsp.go
  11. +14
    -0
      rpc/core/types/responses.go
  12. +71
    -10
      rpc/test/client_test.go
  13. +1
    -4
      rpc/test/helpers.go
  14. +36
    -1
      test/app/dummy_test.sh

+ 7
- 7
config/tendermint_test/config.go View File

@ -87,13 +87,13 @@ func ResetConfig(localPath string) cfg.Config {
mapConfig.SetDefault("block_size", 10000) mapConfig.SetDefault("block_size", 10000)
mapConfig.SetDefault("disable_data_hash", false) mapConfig.SetDefault("disable_data_hash", false)
mapConfig.SetDefault("timeout_propose", 3000)
mapConfig.SetDefault("timeout_propose_delta", 1000)
mapConfig.SetDefault("timeout_prevote", 2000)
mapConfig.SetDefault("timeout_prevote_delta", 1000)
mapConfig.SetDefault("timeout_precommit", 2000)
mapConfig.SetDefault("timeout_precommit_delta", 1000)
mapConfig.SetDefault("timeout_commit", 1000)
mapConfig.SetDefault("timeout_propose", 2000)
mapConfig.SetDefault("timeout_propose_delta", 500)
mapConfig.SetDefault("timeout_prevote", 1000)
mapConfig.SetDefault("timeout_prevote_delta", 500)
mapConfig.SetDefault("timeout_precommit", 1000)
mapConfig.SetDefault("timeout_precommit_delta", 500)
mapConfig.SetDefault("timeout_commit", 100)
mapConfig.SetDefault("mempool_recheck", true) mapConfig.SetDefault("mempool_recheck", true)
mapConfig.SetDefault("mempool_recheck_empty", true) mapConfig.SetDefault("mempool_recheck_empty", true)
mapConfig.SetDefault("mempool_broadcast", true) mapConfig.SetDefault("mempool_broadcast", true)


+ 9
- 12
consensus/state.go View File

@ -318,13 +318,13 @@ func (cs *ConsensusState) OnStart() error {
// let's go for it anyways, maybe we're fine // let's go for it anyways, maybe we're fine
} }
// schedule the first round!
cs.scheduleRound0(cs.Height)
// start the receiveRoutine last
// to avoid races (catchupReplay may have queued tocks/messages)
// now start the receiveRoutine
go cs.receiveRoutine(0) go cs.receiveRoutine(0)
// schedule the first round!
// use GetRoundState so we don't race the receiveRoutine for access
cs.scheduleRound0(cs.GetRoundState())
return nil return nil
} }
@ -421,13 +421,13 @@ func (cs *ConsensusState) updateRoundStep(round int, step RoundStepType) {
} }
// enterNewRound(height, 0) at cs.StartTime. // enterNewRound(height, 0) at cs.StartTime.
func (cs *ConsensusState) scheduleRound0(height int) {
func (cs *ConsensusState) scheduleRound0(rs *RoundState) {
//log.Info("scheduleRound0", "now", time.Now(), "startTime", cs.StartTime) //log.Info("scheduleRound0", "now", time.Now(), "startTime", cs.StartTime)
sleepDuration := cs.StartTime.Sub(time.Now())
sleepDuration := rs.StartTime.Sub(time.Now())
if sleepDuration < time.Duration(0) { if sleepDuration < time.Duration(0) {
sleepDuration = time.Duration(0) sleepDuration = time.Duration(0)
} }
cs.scheduleTimeout(sleepDuration, height, 0, RoundStepNewHeight)
cs.scheduleTimeout(sleepDuration, rs.Height, 0, RoundStepNewHeight)
} }
// Attempt to schedule a timeout by sending timeoutInfo on the tickChan. // Attempt to schedule a timeout by sending timeoutInfo on the tickChan.
@ -1270,7 +1270,7 @@ func (cs *ConsensusState) finalizeCommit(height int) {
// cs.StartTime is already set. // cs.StartTime is already set.
// Schedule Round0 to start soon. // Schedule Round0 to start soon.
cs.scheduleRound0(height + 1)
cs.scheduleRound0(&cs.RoundState)
// By here, // By here,
// * cs.Height has been increment to height+1 // * cs.Height has been increment to height+1
@ -1286,9 +1286,6 @@ func (cs *ConsensusState) commitStateUpdateMempool(s *sm.State, block *types.Blo
cs.mempool.Lock() cs.mempool.Lock()
defer cs.mempool.Unlock() defer cs.mempool.Unlock()
// flush out any CheckTx that have already started
// cs.proxyAppConn.FlushSync() // ?! XXX
// Commit block, get hash back // Commit block, get hash back
res := cs.proxyAppConn.CommitSync() res := cs.proxyAppConn.CommitSync()
if res.IsErr() { if res.IsErr() {


+ 18
- 10
node/node.go View File

@ -44,7 +44,14 @@ type Node struct {
proxyApp proxy.AppConns proxyApp proxy.AppConns
} }
func NewNode(config cfg.Config, privValidator *types.PrivValidator) *Node {
func NewNodeDefault(config cfg.Config) *Node {
// Get PrivValidator
privValidatorFile := config.GetString("priv_validator_file")
privValidator := types.LoadOrGenPrivValidator(privValidatorFile)
return NewNode(config, privValidator, proxy.DefaultClientCreator(config))
}
func NewNode(config cfg.Config, privValidator *types.PrivValidator, clientCreator proxy.ClientCreator) *Node {
EnsureDir(config.GetString("db_dir"), 0700) // incase we use memdb, cswal still gets written here EnsureDir(config.GetString("db_dir"), 0700) // incase we use memdb, cswal still gets written here
@ -60,7 +67,7 @@ func NewNode(config cfg.Config, privValidator *types.PrivValidator) *Node {
// Create the proxyApp, which houses three connections: // Create the proxyApp, which houses three connections:
// query, consensus, and mempool // query, consensus, and mempool
proxyApp := proxy.NewAppConns(config, state, blockStore)
proxyApp := proxy.NewAppConns(config, clientCreator, state, blockStore)
// add the chainid and number of validators to the global config // add the chainid and number of validators to the global config
config.Set("chain_id", state.ChainID) config.Set("chain_id", state.ChainID)
@ -116,6 +123,7 @@ func NewNode(config cfg.Config, privValidator *types.PrivValidator) *Node {
// if the query return code is OK, add peer // if the query return code is OK, add peer
// XXX: query format subject to change // XXX: query format subject to change
if config.GetBool("filter_peers") { if config.GetBool("filter_peers") {
// NOTE: addr is ip:port
sw.SetAddrFilter(func(addr net.Addr) error { sw.SetAddrFilter(func(addr net.Addr) error {
res := proxyApp.Query().QuerySync([]byte(Fmt("p2p/filter/addr/%s", addr.String()))) res := proxyApp.Query().QuerySync([]byte(Fmt("p2p/filter/addr/%s", addr.String())))
if res.IsOK() { if res.IsOK() {
@ -201,6 +209,7 @@ func (n *Node) StartRPC() ([]net.Listener, error) {
rpccore.SetSwitch(n.sw) rpccore.SetSwitch(n.sw)
rpccore.SetPrivValidator(n.privValidator) rpccore.SetPrivValidator(n.privValidator)
rpccore.SetGenesisDoc(n.genesisDoc) rpccore.SetGenesisDoc(n.genesisDoc)
rpccore.SetProxyAppQuery(n.proxyApp.Query())
listenAddrs := strings.Split(n.config.GetString("rpc_laddr"), ",") listenAddrs := strings.Split(n.config.GetString("rpc_laddr"), ",")
@ -299,9 +308,12 @@ func getState(config cfg.Config, stateDB dbm.DB) *sm.State {
//------------------------------------------------------------------------------ //------------------------------------------------------------------------------
// Users wishing to use an external signer for their validators
// Users wishing to:
// * use an external signer for their validators
// * supply an in-proc tmsp app
// should fork tendermint/tendermint and implement RunNode to // should fork tendermint/tendermint and implement RunNode to
// load their custom priv validator and call NewNode
// call NewNode with their custom priv validator and/or custom
// proxy.ClientCreator interface
func RunNode(config cfg.Config) { func RunNode(config cfg.Config) {
// Wait until the genesis doc becomes available // Wait until the genesis doc becomes available
genDocFile := config.GetString("genesis_file") genDocFile := config.GetString("genesis_file")
@ -324,12 +336,8 @@ func RunNode(config cfg.Config) {
} }
} }
// Get PrivValidator
privValidatorFile := config.GetString("priv_validator_file")
privValidator := types.LoadOrGenPrivValidator(privValidatorFile)
// Create & start node // Create & start node
n := NewNode(config, privValidator)
n := NewNodeDefault(config)
protocol, address := ProtocolAndAddress(config.GetString("node_laddr")) protocol, address := ProtocolAndAddress(config.GetString("node_laddr"))
l := p2p.NewDefaultListener(protocol, address, config.GetBool("skip_upnp")) l := p2p.NewDefaultListener(protocol, address, config.GetBool("skip_upnp"))
@ -384,7 +392,7 @@ func newConsensusState(config cfg.Config) *consensus.ConsensusState {
// Create two proxyAppConn connections, // Create two proxyAppConn connections,
// one for the consensus and one for the mempool. // one for the consensus and one for the mempool.
proxyApp := proxy.NewAppConns(config, state, blockStore)
proxyApp := proxy.NewAppConns(config, proxy.DefaultClientCreator(config), state, blockStore)
// add the chainid to the global config // add the chainid to the global config
config.Set("chain_id", state.ChainID) config.Set("chain_id", state.ChainID)


+ 1
- 6
node/node_test.go View File

@ -6,18 +6,13 @@ import (
"github.com/tendermint/go-p2p" "github.com/tendermint/go-p2p"
"github.com/tendermint/tendermint/config/tendermint_test" "github.com/tendermint/tendermint/config/tendermint_test"
"github.com/tendermint/tendermint/types"
) )
func TestNodeStartStop(t *testing.T) { func TestNodeStartStop(t *testing.T) {
config := tendermint_test.ResetConfig("node_node_test") config := tendermint_test.ResetConfig("node_node_test")
// Get PrivValidator
privValidatorFile := config.GetString("priv_validator_file")
privValidator := types.LoadOrGenPrivValidator(privValidatorFile)
// Create & start node // Create & start node
n := NewNode(config, privValidator)
n := NewNodeDefault(config)
protocol, address := ProtocolAndAddress(config.GetString("node_laddr")) protocol, address := ProtocolAndAddress(config.GetString("node_laddr"))
l := p2p.NewDefaultListener(protocol, address, config.GetBool("skip_upnp")) l := p2p.NewDefaultListener(protocol, address, config.GetBool("skip_upnp"))
n.AddListener(l) n.AddListener(l)


+ 6
- 3
proxy/app_conn_test.go View File

@ -45,6 +45,7 @@ var SOCKET = "socket"
func TestEcho(t *testing.T) { func TestEcho(t *testing.T) {
sockPath := Fmt("unix:///tmp/echo_%v.sock", RandStr(6)) sockPath := Fmt("unix:///tmp/echo_%v.sock", RandStr(6))
clientCreator := NewRemoteClientCreator(sockPath, SOCKET, true)
// Start server // Start server
s, err := server.NewSocketServer(sockPath, dummy.NewDummyApplication()) s, err := server.NewSocketServer(sockPath, dummy.NewDummyApplication())
@ -53,7 +54,7 @@ func TestEcho(t *testing.T) {
} }
defer s.Stop() defer s.Stop()
// Start client // Start client
cli, err := NewTMSPClient(sockPath, SOCKET)
cli, err := clientCreator.NewTMSPClient()
if err != nil { if err != nil {
Exit(err.Error()) Exit(err.Error())
} }
@ -69,6 +70,7 @@ func TestEcho(t *testing.T) {
func BenchmarkEcho(b *testing.B) { func BenchmarkEcho(b *testing.B) {
b.StopTimer() // Initialize b.StopTimer() // Initialize
sockPath := Fmt("unix:///tmp/echo_%v.sock", RandStr(6)) sockPath := Fmt("unix:///tmp/echo_%v.sock", RandStr(6))
clientCreator := NewRemoteClientCreator(sockPath, SOCKET, true)
// Start server // Start server
s, err := server.NewSocketServer(sockPath, dummy.NewDummyApplication()) s, err := server.NewSocketServer(sockPath, dummy.NewDummyApplication())
if err != nil { if err != nil {
@ -76,7 +78,7 @@ func BenchmarkEcho(b *testing.B) {
} }
defer s.Stop() defer s.Stop()
// Start client // Start client
cli, err := NewTMSPClient(sockPath, SOCKET)
cli, err := clientCreator.NewTMSPClient()
if err != nil { if err != nil {
Exit(err.Error()) Exit(err.Error())
} }
@ -97,6 +99,7 @@ func BenchmarkEcho(b *testing.B) {
func TestInfo(t *testing.T) { func TestInfo(t *testing.T) {
sockPath := Fmt("unix:///tmp/echo_%v.sock", RandStr(6)) sockPath := Fmt("unix:///tmp/echo_%v.sock", RandStr(6))
clientCreator := NewRemoteClientCreator(sockPath, SOCKET, true)
// Start server // Start server
s, err := server.NewSocketServer(sockPath, dummy.NewDummyApplication()) s, err := server.NewSocketServer(sockPath, dummy.NewDummyApplication())
if err != nil { if err != nil {
@ -104,7 +107,7 @@ func TestInfo(t *testing.T) {
} }
defer s.Stop() defer s.Stop()
// Start client // Start client
cli, err := NewTMSPClient(sockPath, SOCKET)
cli, err := clientCreator.NewTMSPClient()
if err != nil { if err != nil {
Exit(err.Error()) Exit(err.Error())
} }


+ 80
- 0
proxy/client.go View File

@ -0,0 +1,80 @@
package proxy
import (
"fmt"
"sync"
cfg "github.com/tendermint/go-config"
tmspcli "github.com/tendermint/tmsp/client"
"github.com/tendermint/tmsp/example/dummy"
nilapp "github.com/tendermint/tmsp/example/nil"
"github.com/tendermint/tmsp/types"
)
// NewTMSPClient returns newly connected client
type ClientCreator interface {
NewTMSPClient() (tmspcli.Client, error)
}
//----------------------------------------------------
// local proxy uses a mutex on an in-proc app
type localClientCreator struct {
mtx *sync.Mutex
app types.Application
}
func NewLocalClientCreator(app types.Application) ClientCreator {
return &localClientCreator{
mtx: new(sync.Mutex),
app: app,
}
}
func (l *localClientCreator) NewTMSPClient() (tmspcli.Client, error) {
return tmspcli.NewLocalClient(l.mtx, l.app), nil
}
//---------------------------------------------------------------
// remote proxy opens new connections to an external app process
type remoteClientCreator struct {
addr string
transport string
mustConnect bool
}
func NewRemoteClientCreator(addr, transport string, mustConnect bool) ClientCreator {
return &remoteClientCreator{
addr: addr,
transport: transport,
mustConnect: mustConnect,
}
}
func (r *remoteClientCreator) NewTMSPClient() (tmspcli.Client, error) {
// Run forever in a loop
remoteApp, err := tmspcli.NewClient(r.addr, r.transport, r.mustConnect)
if err != nil {
return nil, fmt.Errorf("Failed to connect to proxy: %v", err)
}
return remoteApp, nil
}
//-----------------------------------------------------------------
// default
func DefaultClientCreator(config cfg.Config) ClientCreator {
addr := config.GetString("proxy_app")
transport := config.GetString("tmsp")
switch addr {
case "dummy":
return NewLocalClientCreator(dummy.NewDummyApplication())
case "nil":
return NewLocalClientCreator(nilapp.NewNilApplication())
default:
mustConnect := true
return NewRemoteClientCreator(addr, transport, mustConnect)
}
}

+ 14
- 48
proxy/multi_app_conn.go View File

@ -1,56 +1,22 @@
package proxy package proxy
import ( import (
"fmt"
"sync"
. "github.com/tendermint/go-common" . "github.com/tendermint/go-common"
cfg "github.com/tendermint/go-config" cfg "github.com/tendermint/go-config"
tmspcli "github.com/tendermint/tmsp/client"
"github.com/tendermint/tmsp/example/dummy"
nilapp "github.com/tendermint/tmsp/example/nil"
) )
// Get a connected tmsp client
func NewTMSPClient(addr, transport string) (tmspcli.Client, error) {
var client tmspcli.Client
// use local app (for testing)
// TODO: local proxy app conn
switch addr {
case "nilapp":
app := nilapp.NewNilApplication()
mtx := new(sync.Mutex) // TODO
client = tmspcli.NewLocalClient(mtx, app)
case "dummy":
app := dummy.NewDummyApplication()
mtx := new(sync.Mutex) // TODO
client = tmspcli.NewLocalClient(mtx, app)
default:
// Run forever in a loop
mustConnect := false
remoteApp, err := tmspcli.NewClient(addr, transport, mustConnect)
if err != nil {
return nil, fmt.Errorf("Failed to connect to proxy for mempool: %v", err)
}
client = remoteApp
}
return client, nil
}
//---------
// Tendermint's interface to the application consists of multiple connections
type AppConns interface { type AppConns interface {
Mempool() AppConnMempool Mempool() AppConnMempool
Consensus() AppConnConsensus Consensus() AppConnConsensus
Query() AppConnQuery Query() AppConnQuery
} }
func NewAppConns(config cfg.Config, state State, blockStore BlockStore) AppConns {
return NewMultiAppConn(config, state, blockStore)
func NewAppConns(config cfg.Config, clientCreator ClientCreator, state State, blockStore BlockStore) AppConns {
return NewMultiAppConn(config, clientCreator, state, blockStore)
} }
// a multiAppConn is made of a few appConns (mempool, consensus)
// a multiAppConn is made of a few appConns (mempool, consensus, query)
// and manages their underlying tmsp clients, ensuring they reboot together // and manages their underlying tmsp clients, ensuring they reboot together
type multiAppConn struct { type multiAppConn struct {
QuitService QuitService
@ -63,14 +29,17 @@ type multiAppConn struct {
mempoolConn *appConnMempool mempoolConn *appConnMempool
consensusConn *appConnConsensus consensusConn *appConnConsensus
queryConn *appConnQuery queryConn *appConnQuery
clientCreator ClientCreator
} }
// Make all necessary tmsp connections to the application // Make all necessary tmsp connections to the application
func NewMultiAppConn(config cfg.Config, state State, blockStore BlockStore) *multiAppConn {
func NewMultiAppConn(config cfg.Config, clientCreator ClientCreator, state State, blockStore BlockStore) *multiAppConn {
multiAppConn := &multiAppConn{ multiAppConn := &multiAppConn{
config: config,
state: state,
blockStore: blockStore,
config: config,
state: state,
blockStore: blockStore,
clientCreator: clientCreator,
} }
multiAppConn.QuitService = *NewQuitService(log, "multiAppConn", multiAppConn) multiAppConn.QuitService = *NewQuitService(log, "multiAppConn", multiAppConn)
multiAppConn.Start() multiAppConn.Start()
@ -94,25 +63,22 @@ func (app *multiAppConn) Query() AppConnQuery {
func (app *multiAppConn) OnStart() error { func (app *multiAppConn) OnStart() error {
app.QuitService.OnStart() app.QuitService.OnStart()
addr := app.config.GetString("proxy_app")
transport := app.config.GetString("tmsp")
// query connection // query connection
querycli, err := NewTMSPClient(addr, transport)
querycli, err := app.clientCreator.NewTMSPClient()
if err != nil { if err != nil {
return err return err
} }
app.queryConn = NewAppConnQuery(querycli) app.queryConn = NewAppConnQuery(querycli)
// mempool connection // mempool connection
memcli, err := NewTMSPClient(addr, transport)
memcli, err := app.clientCreator.NewTMSPClient()
if err != nil { if err != nil {
return err return err
} }
app.mempoolConn = NewAppConnMempool(memcli) app.mempoolConn = NewAppConnMempool(memcli)
// consensus connection // consensus connection
concli, err := NewTMSPClient(addr, transport)
concli, err := app.clientCreator.NewTMSPClient()
if err != nil { if err != nil {
return err return err
} }


+ 6
- 0
rpc/core/pipe.go View File

@ -8,6 +8,7 @@ import (
bc "github.com/tendermint/tendermint/blockchain" bc "github.com/tendermint/tendermint/blockchain"
"github.com/tendermint/tendermint/consensus" "github.com/tendermint/tendermint/consensus"
mempl "github.com/tendermint/tendermint/mempool" mempl "github.com/tendermint/tendermint/mempool"
"github.com/tendermint/tendermint/proxy"
"github.com/tendermint/tendermint/types" "github.com/tendermint/tendermint/types"
) )
@ -19,6 +20,7 @@ var mempoolReactor *mempl.MempoolReactor
var p2pSwitch *p2p.Switch var p2pSwitch *p2p.Switch
var privValidator *types.PrivValidator var privValidator *types.PrivValidator
var genDoc *types.GenesisDoc // cache the genesis structure var genDoc *types.GenesisDoc // cache the genesis structure
var proxyAppQuery proxy.AppConnQuery
var config cfg.Config = nil var config cfg.Config = nil
@ -57,3 +59,7 @@ func SetPrivValidator(pv *types.PrivValidator) {
func SetGenesisDoc(doc *types.GenesisDoc) { func SetGenesisDoc(doc *types.GenesisDoc) {
genDoc = doc genDoc = doc
} }
func SetProxyAppQuery(appConn proxy.AppConnQuery) {
proxyAppQuery = appConn
}

+ 19
- 0
rpc/core/routes.go View File

@ -25,6 +25,9 @@ var Routes = map[string]*rpc.RPCFunc{
"unconfirmed_txs": rpc.NewRPCFunc(UnconfirmedTxsResult, ""), "unconfirmed_txs": rpc.NewRPCFunc(UnconfirmedTxsResult, ""),
"num_unconfirmed_txs": rpc.NewRPCFunc(NumUnconfirmedTxsResult, ""), "num_unconfirmed_txs": rpc.NewRPCFunc(NumUnconfirmedTxsResult, ""),
"tmsp_query": rpc.NewRPCFunc(TMSPQueryResult, "query"),
"tmsp_info": rpc.NewRPCFunc(TMSPInfoResult, ""),
"unsafe_flush_mempool": rpc.NewRPCFunc(UnsafeFlushMempool, ""), "unsafe_flush_mempool": rpc.NewRPCFunc(UnsafeFlushMempool, ""),
"unsafe_set_config": rpc.NewRPCFunc(UnsafeSetConfigResult, "type,key,value"), "unsafe_set_config": rpc.NewRPCFunc(UnsafeSetConfigResult, "type,key,value"),
"unsafe_start_cpu_profiler": rpc.NewRPCFunc(UnsafeStartCPUProfilerResult, "filename"), "unsafe_start_cpu_profiler": rpc.NewRPCFunc(UnsafeStartCPUProfilerResult, "filename"),
@ -152,6 +155,22 @@ func BroadcastTxAsyncResult(tx []byte) (ctypes.TMResult, error) {
} }
} }
func TMSPQueryResult(query []byte) (ctypes.TMResult, error) {
if r, err := TMSPQuery(query); err != nil {
return nil, err
} else {
return r, nil
}
}
func TMSPInfoResult() (ctypes.TMResult, error) {
if r, err := TMSPInfo(); err != nil {
return nil, err
} else {
return r, nil
}
}
func UnsafeFlushMempoolResult() (ctypes.TMResult, error) { func UnsafeFlushMempoolResult() (ctypes.TMResult, error) {
if r, err := UnsafeFlushMempool(); err != nil { if r, err := UnsafeFlushMempool(); err != nil {
return nil, err return nil, err


+ 17
- 0
rpc/core/tmsp.go View File

@ -0,0 +1,17 @@
package core
import (
ctypes "github.com/tendermint/tendermint/rpc/core/types"
)
//-----------------------------------------------------------------------------
func TMSPQuery(query []byte) (*ctypes.ResultTMSPQuery, error) {
res := proxyAppQuery.QuerySync(query)
return &ctypes.ResultTMSPQuery{res}, nil
}
func TMSPInfo() (*ctypes.ResultTMSPInfo, error) {
res := proxyAppQuery.InfoSync()
return &ctypes.ResultTMSPInfo{res}, nil
}

+ 14
- 0
rpc/core/types/responses.go View File

@ -68,6 +68,14 @@ type ResultUnconfirmedTxs struct {
Txs []types.Tx `json:"txs"` Txs []types.Tx `json:"txs"`
} }
type ResultTMSPInfo struct {
Result tmsp.Result `json:"result"`
}
type ResultTMSPQuery struct {
Result tmsp.Result `json:"result"`
}
type ResultUnsafeFlushMempool struct{} type ResultUnsafeFlushMempool struct{}
type ResultUnsafeSetConfig struct{} type ResultUnsafeSetConfig struct{}
@ -107,6 +115,10 @@ const (
ResultTypeBroadcastTx = byte(0x60) ResultTypeBroadcastTx = byte(0x60)
ResultTypeUnconfirmedTxs = byte(0x61) ResultTypeUnconfirmedTxs = byte(0x61)
// 0x7 bytes are for querying the application
ResultTypeTMSPQuery = byte(0x70)
ResultTypeTMSPInfo = byte(0x71)
// 0x8 bytes are for events // 0x8 bytes are for events
ResultTypeSubscribe = byte(0x80) ResultTypeSubscribe = byte(0x80)
ResultTypeUnsubscribe = byte(0x81) ResultTypeUnsubscribe = byte(0x81)
@ -145,4 +157,6 @@ var _ = wire.RegisterInterface(
wire.ConcreteType{&ResultUnsafeProfile{}, ResultTypeUnsafeStopCPUProfiler}, wire.ConcreteType{&ResultUnsafeProfile{}, ResultTypeUnsafeStopCPUProfiler},
wire.ConcreteType{&ResultUnsafeProfile{}, ResultTypeUnsafeWriteHeapProfile}, wire.ConcreteType{&ResultUnsafeProfile{}, ResultTypeUnsafeWriteHeapProfile},
wire.ConcreteType{&ResultUnsafeFlushMempool{}, ResultTypeUnsafeFlushMempool}, wire.ConcreteType{&ResultUnsafeFlushMempool{}, ResultTypeUnsafeFlushMempool},
wire.ConcreteType{&ResultTMSPQuery{}, ResultTypeTMSPQuery},
wire.ConcreteType{&ResultTMSPInfo{}, ResultTypeTMSPInfo},
) )

+ 71
- 10
rpc/test/client_test.go View File

@ -2,9 +2,12 @@ package rpctest
import ( import (
"bytes" "bytes"
"crypto/rand"
crand "crypto/rand"
"fmt" "fmt"
"math/rand"
"strings"
"testing" "testing"
"time"
. "github.com/tendermint/go-common" . "github.com/tendermint/go-common"
ctypes "github.com/tendermint/tendermint/rpc/core/types" ctypes "github.com/tendermint/tendermint/rpc/core/types"
@ -14,6 +17,7 @@ import (
//-------------------------------------------------------------------------------- //--------------------------------------------------------------------------------
// Test the HTTP client // Test the HTTP client
// These tests assume the dummy app
//-------------------------------------------------------------------------------- //--------------------------------------------------------------------------------
//-------------------------------------------------------------------------------- //--------------------------------------------------------------------------------
@ -49,20 +53,22 @@ func testStatus(t *testing.T, statusI interface{}) {
//-------------------------------------------------------------------------------- //--------------------------------------------------------------------------------
// broadcast tx sync // broadcast tx sync
func testTx() []byte {
buf := make([]byte, 16)
_, err := rand.Read(buf)
// random bytes (excluding byte('='))
func randBytes() []byte {
n := rand.Intn(10) + 2
buf := make([]byte, n)
_, err := crand.Read(buf)
if err != nil { if err != nil {
panic(err) panic(err)
} }
return buf
return bytes.Replace(buf, []byte("="), []byte{100}, -1)
} }
func TestURIBroadcastTxSync(t *testing.T) { func TestURIBroadcastTxSync(t *testing.T) {
config.Set("block_size", 0) config.Set("block_size", 0)
defer config.Set("block_size", -1) defer config.Set("block_size", -1)
tmResult := new(ctypes.TMResult) tmResult := new(ctypes.TMResult)
tx := testTx()
tx := randBytes()
_, err := clientURI.Call("broadcast_tx_sync", map[string]interface{}{"tx": tx}, tmResult) _, err := clientURI.Call("broadcast_tx_sync", map[string]interface{}{"tx": tx}, tmResult)
if err != nil { if err != nil {
panic(err) panic(err)
@ -74,7 +80,7 @@ func TestJSONBroadcastTxSync(t *testing.T) {
config.Set("block_size", 0) config.Set("block_size", 0)
defer config.Set("block_size", -1) defer config.Set("block_size", -1)
tmResult := new(ctypes.TMResult) tmResult := new(ctypes.TMResult)
tx := testTx()
tx := randBytes()
_, err := clientJSON.Call("broadcast_tx_sync", []interface{}{tx}, tmResult) _, err := clientJSON.Call("broadcast_tx_sync", []interface{}{tx}, tmResult)
if err != nil { if err != nil {
panic(err) panic(err)
@ -95,18 +101,73 @@ func testBroadcastTxSync(t *testing.T, resI interface{}, tx []byte) {
txs := mem.Reap(1) txs := mem.Reap(1)
if !bytes.Equal(txs[0], tx) { if !bytes.Equal(txs[0], tx) {
panic(Fmt("Tx in mempool does not match test tx. Got %X, expected %X", txs[0], testTx))
panic(Fmt("Tx in mempool does not match test tx. Got %X, expected %X", txs[0], tx))
} }
mem.Flush() mem.Flush()
} }
//--------------------------------------------------------------------------------
// query
func testTxKV() ([]byte, []byte, []byte) {
k := randBytes()
v := randBytes()
return k, v, []byte(Fmt("%s=%s", k, v))
}
func sendTx() ([]byte, []byte) {
tmResult := new(ctypes.TMResult)
k, v, tx := testTxKV()
_, err := clientJSON.Call("broadcast_tx_commit", []interface{}{tx}, tmResult)
if err != nil {
panic(err)
}
fmt.Println("SENT TX", tx)
fmt.Printf("SENT TX %X\n", tx)
fmt.Printf("k %X; v %X", k, v)
return k, v
}
func TestURITMSPQuery(t *testing.T) {
k, v := sendTx()
time.Sleep(time.Second)
tmResult := new(ctypes.TMResult)
_, err := clientURI.Call("tmsp_query", map[string]interface{}{"query": Fmt("%X", k)}, tmResult)
if err != nil {
panic(err)
}
testTMSPQuery(t, tmResult, v)
}
func TestJSONTMSPQuery(t *testing.T) {
k, v := sendTx()
tmResult := new(ctypes.TMResult)
_, err := clientJSON.Call("tmsp_query", []interface{}{Fmt("%X", k)}, tmResult)
if err != nil {
panic(err)
}
testTMSPQuery(t, tmResult, v)
}
func testTMSPQuery(t *testing.T, statusI interface{}, value []byte) {
tmRes := statusI.(*ctypes.TMResult)
query := (*tmRes).(*ctypes.ResultTMSPQuery)
if query.Result.IsErr() {
panic(Fmt("Query returned an err: %v", query))
}
// XXX: specific to value returned by the dummy
if !strings.Contains(string(query.Result.Data), "exists=true") {
panic(Fmt("Query error. Expected to find 'exists=true'. Got: %s", query.Result.Data))
}
}
//-------------------------------------------------------------------------------- //--------------------------------------------------------------------------------
// broadcast tx commit // broadcast tx commit
func TestURIBroadcastTxCommit(t *testing.T) { func TestURIBroadcastTxCommit(t *testing.T) {
tmResult := new(ctypes.TMResult) tmResult := new(ctypes.TMResult)
tx := testTx()
tx := randBytes()
_, err := clientURI.Call("broadcast_tx_commit", map[string]interface{}{"tx": tx}, tmResult) _, err := clientURI.Call("broadcast_tx_commit", map[string]interface{}{"tx": tx}, tmResult)
if err != nil { if err != nil {
panic(err) panic(err)
@ -116,7 +177,7 @@ func TestURIBroadcastTxCommit(t *testing.T) {
func TestJSONBroadcastTxCommit(t *testing.T) { func TestJSONBroadcastTxCommit(t *testing.T) {
tmResult := new(ctypes.TMResult) tmResult := new(ctypes.TMResult)
tx := testTx()
tx := randBytes()
_, err := clientJSON.Call("broadcast_tx_commit", []interface{}{tx}, tmResult) _, err := clientJSON.Call("broadcast_tx_commit", []interface{}{tx}, tmResult)
if err != nil { if err != nil {
panic(err) panic(err)


+ 1
- 4
rpc/test/helpers.go View File

@ -13,7 +13,6 @@ import (
"github.com/tendermint/tendermint/config/tendermint_test" "github.com/tendermint/tendermint/config/tendermint_test"
nm "github.com/tendermint/tendermint/node" nm "github.com/tendermint/tendermint/node"
ctypes "github.com/tendermint/tendermint/rpc/core/types" ctypes "github.com/tendermint/tendermint/rpc/core/types"
"github.com/tendermint/tendermint/types"
) )
// global variables for use across all tests // global variables for use across all tests
@ -52,9 +51,7 @@ func init() {
// create a new node and sleep forever // create a new node and sleep forever
func newNode(ready chan struct{}) { func newNode(ready chan struct{}) {
// Create & start node // Create & start node
privValidatorFile := config.GetString("priv_validator_file")
privValidator := types.LoadOrGenPrivValidator(privValidatorFile)
node = nm.NewNode(config, privValidator)
node = nm.NewNodeDefault(config)
protocol, address := nm.ProtocolAndAddress(config.GetString("node_laddr")) protocol, address := nm.ProtocolAndAddress(config.GetString("node_laddr"))
l := p2p.NewDefaultListener(protocol, address, true) l := p2p.NewDefaultListener(protocol, address, true)
node.AddListener(l) node.AddListener(l)


+ 36
- 1
test/app/dummy_test.sh View File

@ -13,10 +13,15 @@ TESTNAME=$1
# store key value pair # store key value pair
KEY="abcd" KEY="abcd"
VALUE="dcba" VALUE="dcba"
curl 127.0.0.1:46657/broadcast_tx_commit?tx=\"$(toHex $KEY=$VALUE)\"
curl -s 127.0.0.1:46657/broadcast_tx_commit?tx=\"$(toHex $KEY=$VALUE)\"
echo $? echo $?
echo "" echo ""
###########################
# test using the tmsp-cli
###########################
# we should be able to look up the key # we should be able to look up the key
RESPONSE=`tmsp-cli query $KEY` RESPONSE=`tmsp-cli query $KEY`
@ -40,4 +45,34 @@ if [[ $? == 0 ]]; then
fi fi
set -e set -e
#############################
# test using the /tmsp_query
#############################
# we should be able to look up the key
RESPONSE=`curl -s 127.0.0.1:46657/tmsp_query?query=\"$(toHex $KEY)\"`
RESPONSE=`echo $RESPONSE | jq .result[1].result.Data | xxd -r -p`
set +e
A=`echo $RESPONSE | grep exists=true`
if [[ $? != 0 ]]; then
echo "Failed to find 'exists=true' for $KEY. Response:"
echo "$RESPONSE"
exit 1
fi
set -e
# we should not be able to look up the value
RESPONSE=`curl -s 127.0.0.1:46657/tmsp_query?query=\"$(toHex $VALUE)\"`
RESPONSE=`echo $RESPONSE | jq .result[1].result.Data | xxd -r -p`
set +e
A=`echo $RESPONSE | grep exists=true`
if [[ $? == 0 ]]; then
echo "Found 'exists=true' for $VALUE when we should not have. Response:"
echo "$RESPONSE"
exit 1
fi
set -e
echo "Passed Test: $TESTNAME" echo "Passed Test: $TESTNAME"

Loading…
Cancel
Save