Browse Source

rpc: TMResult and TMEventData

pull/178/head
Ethan Buchman 9 years ago
parent
commit
3fdb4c03ab
7 changed files with 82 additions and 66 deletions
  1. +0
    -11
      node/node.go
  2. +4
    -6
      rpc/core/events.go
  3. +22
    -22
      rpc/core/routes.go
  4. +11
    -6
      rpc/core/types/responses.go
  5. +9
    -6
      rpc/test/client_test.go
  6. +23
    -8
      rpc/test/helpers.go
  7. +13
    -7
      types/events.go

+ 0
- 11
node/node.go View File

@ -17,14 +17,12 @@ import (
"github.com/tendermint/go-p2p" "github.com/tendermint/go-p2p"
"github.com/tendermint/go-rpc" "github.com/tendermint/go-rpc"
"github.com/tendermint/go-rpc/server" "github.com/tendermint/go-rpc/server"
"github.com/tendermint/go-rpc/types"
"github.com/tendermint/go-wire" "github.com/tendermint/go-wire"
bc "github.com/tendermint/tendermint/blockchain" bc "github.com/tendermint/tendermint/blockchain"
"github.com/tendermint/tendermint/consensus" "github.com/tendermint/tendermint/consensus"
mempl "github.com/tendermint/tendermint/mempool" mempl "github.com/tendermint/tendermint/mempool"
"github.com/tendermint/tendermint/proxy" "github.com/tendermint/tendermint/proxy"
"github.com/tendermint/tendermint/rpc/core" "github.com/tendermint/tendermint/rpc/core"
ctypes "github.com/tendermint/tendermint/rpc/core/types"
sm "github.com/tendermint/tendermint/state" sm "github.com/tendermint/tendermint/state"
"github.com/tendermint/tendermint/types" "github.com/tendermint/tendermint/types"
"github.com/tendermint/tmsp/example/golang" "github.com/tendermint/tmsp/example/golang"
@ -184,15 +182,6 @@ func (n *Node) StartRPC() (net.Listener, error) {
listenAddr := config.GetString("rpc_laddr") listenAddr := config.GetString("rpc_laddr")
// register the result objects with wire
// so consumers of tendermint rpc will not have
// conflicts with their own rpc
wire.RegisterInterface(
struct{ rpctypes.Result }{},
wire.ConcreteType{&events.EventResult{}, 0x1},
wire.ConcreteType{&ctypes.TendermintResult{}, 0x2},
)
mux := http.NewServeMux() mux := http.NewServeMux()
wm := rpcserver.NewWebsocketManager(core.Routes, n.evsw) wm := rpcserver.NewWebsocketManager(core.Routes, n.evsw)
mux.HandleFunc("/websocket", wm.WebsocketHandler) mux.HandleFunc("/websocket", wm.WebsocketHandler)


+ 4
- 6
rpc/core/events.go View File

@ -4,6 +4,7 @@ import (
"github.com/tendermint/go-events" "github.com/tendermint/go-events"
"github.com/tendermint/go-rpc/types" "github.com/tendermint/go-rpc/types"
ctypes "github.com/tendermint/tendermint/rpc/core/types" ctypes "github.com/tendermint/tendermint/rpc/core/types"
"github.com/tendermint/tendermint/types"
) )
func Subscribe(wsCtx rpctypes.WSRPCContext, event string) (*ctypes.ResultSubscribe, error) { func Subscribe(wsCtx rpctypes.WSRPCContext, event string) (*ctypes.ResultSubscribe, error) {
@ -11,17 +12,14 @@ func Subscribe(wsCtx rpctypes.WSRPCContext, event string) (*ctypes.ResultSubscri
wsCtx.GetEventSwitch().AddListenerForEvent(wsCtx.GetRemoteAddr(), event, func(msg events.EventData) { wsCtx.GetEventSwitch().AddListenerForEvent(wsCtx.GetRemoteAddr(), event, func(msg events.EventData) {
// NOTE: EventSwitch callbacks must be nonblocking // NOTE: EventSwitch callbacks must be nonblocking
// NOTE: RPCResponses of subscribed events have id suffix "#event" // NOTE: RPCResponses of subscribed events have id suffix "#event"
wsCtx.TryWriteRPCResponse(rpctypes.NewRPCResponse(wsCtx.Request.ID+"#event", &events.EventResult{event, msg}, ""))
tmResult := ctypes.TMResult(&ctypes.ResultEvent{event, types.TMEventData(msg)})
wsCtx.TryWriteRPCResponse(rpctypes.NewRPCResponse(wsCtx.Request.ID+"#event", &tmResult, ""))
}) })
return &ctypes.ResultSubscribe{}, nil return &ctypes.ResultSubscribe{}, nil
} }
func Unsubscribe(wsCtx rpctypes.WSRPCContext, event string) (*ctypes.ResultUnsubscribe, error) { func Unsubscribe(wsCtx rpctypes.WSRPCContext, event string) (*ctypes.ResultUnsubscribe, error) {
log.Notice("Unsubscribe to event", "remote", wsCtx.GetRemoteAddr(), "event", event) log.Notice("Unsubscribe to event", "remote", wsCtx.GetRemoteAddr(), "event", event)
wsCtx.GetEventSwitch().AddListenerForEvent(wsCtx.GetRemoteAddr(), event, func(msg events.EventData) {
// NOTE: EventSwitch callbacks must be nonblocking
// NOTE: RPCResponses of subscribed events have id suffix "#event"
wsCtx.TryWriteRPCResponse(rpctypes.NewRPCResponse(wsCtx.Request.ID+"#event", &events.EventResult{event, msg}, ""))
})
wsCtx.GetEventSwitch().RemoveListener(event)
return &ctypes.ResultUnsubscribe{}, nil return &ctypes.ResultUnsubscribe{}, nil
} }

+ 22
- 22
rpc/core/routes.go View File

@ -22,90 +22,90 @@ var Routes = map[string]*rpc.RPCFunc{
// subscribe/unsubscribe are reserved for websocket events. // subscribe/unsubscribe are reserved for websocket events.
} }
func SubscribeResult(wsCtx rpctypes.WSRPCContext, event string) (*ctypes.TendermintResult, error) {
func SubscribeResult(wsCtx rpctypes.WSRPCContext, event string) (ctypes.TMResult, error) {
if r, err := Subscribe(wsCtx, event); err != nil { if r, err := Subscribe(wsCtx, event); err != nil {
return nil, err return nil, err
} else { } else {
return &ctypes.TendermintResult{r}, nil
return r, nil
} }
} }
func UnsubscribeResult(wsCtx rpctypes.WSRPCContext, event string) (*ctypes.TendermintResult, error) {
func UnsubscribeResult(wsCtx rpctypes.WSRPCContext, event string) (ctypes.TMResult, error) {
if r, err := Unsubscribe(wsCtx, event); err != nil { if r, err := Unsubscribe(wsCtx, event); err != nil {
return nil, err return nil, err
} else { } else {
return &ctypes.TendermintResult{r}, nil
return r, nil
} }
} }
func StatusResult() (*ctypes.TendermintResult, error) {
func StatusResult() (ctypes.TMResult, error) {
if r, err := Status(); err != nil { if r, err := Status(); err != nil {
return nil, err return nil, err
} else { } else {
return &ctypes.TendermintResult{r}, nil
return r, nil
} }
} }
func NetInfoResult() (*ctypes.TendermintResult, error) {
func NetInfoResult() (ctypes.TMResult, error) {
if r, err := NetInfo(); err != nil { if r, err := NetInfo(); err != nil {
return nil, err return nil, err
} else { } else {
return &ctypes.TendermintResult{r}, nil
return r, nil
} }
} }
func BlockchainInfoResult(min, max int) (*ctypes.TendermintResult, error) {
func BlockchainInfoResult(min, max int) (ctypes.TMResult, error) {
if r, err := BlockchainInfo(min, max); err != nil { if r, err := BlockchainInfo(min, max); err != nil {
return nil, err return nil, err
} else { } else {
return &ctypes.TendermintResult{r}, nil
return r, nil
} }
} }
func GenesisResult() (*ctypes.TendermintResult, error) {
func GenesisResult() (ctypes.TMResult, error) {
if r, err := Genesis(); err != nil { if r, err := Genesis(); err != nil {
return nil, err return nil, err
} else { } else {
return &ctypes.TendermintResult{r}, nil
return r, nil
} }
} }
func GetBlockResult(height int) (*ctypes.TendermintResult, error) {
func GetBlockResult(height int) (ctypes.TMResult, error) {
if r, err := GetBlock(height); err != nil { if r, err := GetBlock(height); err != nil {
return nil, err return nil, err
} else { } else {
return &ctypes.TendermintResult{r}, nil
return r, nil
} }
} }
func ListValidatorsResult() (*ctypes.TendermintResult, error) {
func ListValidatorsResult() (ctypes.TMResult, error) {
if r, err := ListValidators(); err != nil { if r, err := ListValidators(); err != nil {
return nil, err return nil, err
} else { } else {
return &ctypes.TendermintResult{r}, nil
return r, nil
} }
} }
func DumpConsensusStateResult() (*ctypes.TendermintResult, error) {
func DumpConsensusStateResult() (ctypes.TMResult, error) {
if r, err := DumpConsensusState(); err != nil { if r, err := DumpConsensusState(); err != nil {
return nil, err return nil, err
} else { } else {
return &ctypes.TendermintResult{r}, nil
return r, nil
} }
} }
func ListUnconfirmedTxsResult() (*ctypes.TendermintResult, error) {
func ListUnconfirmedTxsResult() (ctypes.TMResult, error) {
if r, err := ListUnconfirmedTxs(); err != nil { if r, err := ListUnconfirmedTxs(); err != nil {
return nil, err return nil, err
} else { } else {
return &ctypes.TendermintResult{r}, nil
return r, nil
} }
} }
func BroadcastTxResult(tx []byte) (*ctypes.TendermintResult, error) {
func BroadcastTxResult(tx []byte) (ctypes.TMResult, error) {
if r, err := BroadcastTx(tx); err != nil { if r, err := BroadcastTx(tx); err != nil {
return nil, err return nil, err
} else { } else {
return &ctypes.TendermintResult{r}, nil
return r, nil
} }
} }

+ 11
- 6
rpc/core/types/responses.go View File

@ -3,6 +3,7 @@ package core_types
import ( import (
"github.com/tendermint/go-crypto" "github.com/tendermint/go-crypto"
"github.com/tendermint/go-p2p" "github.com/tendermint/go-p2p"
"github.com/tendermint/go-rpc/types"
"github.com/tendermint/go-wire" "github.com/tendermint/go-wire"
"github.com/tendermint/tendermint/types" "github.com/tendermint/tendermint/types"
) )
@ -66,6 +67,11 @@ type ResultSubscribe struct {
type ResultUnsubscribe struct { type ResultUnsubscribe struct {
} }
type ResultEvent struct {
Name string `json:"name"`
Data types.TMEventData `json:"data"`
}
//---------------------------------------- //----------------------------------------
// response & result types // response & result types
@ -81,18 +87,16 @@ const (
ResultTypeListUnconfirmedTxs = byte(0x09) ResultTypeListUnconfirmedTxs = byte(0x09)
ResultTypeSubscribe = byte(0x0A) ResultTypeSubscribe = byte(0x0A)
ResultTypeUnsubscribe = byte(0x0B) ResultTypeUnsubscribe = byte(0x0B)
ResultTypeEvent = byte(0x0C)
) )
type TendermintResultInterface interface{}
// NOTE: up to the application to register this as rpctypes.Result
type TendermintResult struct {
Result TendermintResultInterface
type TMResult interface {
rpctypes.Result
} }
// for wire.readReflect // for wire.readReflect
var _ = wire.RegisterInterface( var _ = wire.RegisterInterface(
struct{ TendermintResultInterface }{},
struct{ TMResult }{},
wire.ConcreteType{&ResultGenesis{}, ResultTypeGenesis}, wire.ConcreteType{&ResultGenesis{}, ResultTypeGenesis},
wire.ConcreteType{&ResultBlockchainInfo{}, ResultTypeBlockchainInfo}, wire.ConcreteType{&ResultBlockchainInfo{}, ResultTypeBlockchainInfo},
wire.ConcreteType{&ResultGetBlock{}, ResultTypeGetBlock}, wire.ConcreteType{&ResultGetBlock{}, ResultTypeGetBlock},
@ -104,4 +108,5 @@ var _ = wire.RegisterInterface(
wire.ConcreteType{&ResultListUnconfirmedTxs{}, ResultTypeListUnconfirmedTxs}, wire.ConcreteType{&ResultListUnconfirmedTxs{}, ResultTypeListUnconfirmedTxs},
wire.ConcreteType{&ResultSubscribe{}, ResultTypeSubscribe}, wire.ConcreteType{&ResultSubscribe{}, ResultTypeSubscribe},
wire.ConcreteType{&ResultUnsubscribe{}, ResultTypeUnsubscribe}, wire.ConcreteType{&ResultUnsubscribe{}, ResultTypeUnsubscribe},
wire.ConcreteType{&ResultEvent{}, ResultTypeEvent},
) )

+ 9
- 6
rpc/test/client_test.go View File

@ -13,23 +13,26 @@ import (
// Test the HTTP client // Test the HTTP client
func TestURIStatus(t *testing.T) { func TestURIStatus(t *testing.T) {
result, err := clientURI.Call("status", map[string]interface{}{})
tmResult := new(ctypes.TMResult)
_, err := clientURI.Call("status", map[string]interface{}{}, tmResult)
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }
testStatus(t, result)
testStatus(t, tmResult)
} }
func TestJSONStatus(t *testing.T) { func TestJSONStatus(t *testing.T) {
result, err := clientJSON.Call("status", []interface{}{})
tmResult := new(ctypes.TMResult)
_, err := clientJSON.Call("status", []interface{}{}, tmResult)
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }
testStatus(t, result)
testStatus(t, tmResult)
} }
func testStatus(t *testing.T, result interface{}) {
status := result.(*ctypes.TendermintResult).Result.(*ctypes.ResultStatus)
func testStatus(t *testing.T, statusI interface{}) {
tmRes := statusI.(*ctypes.TMResult)
status := (*tmRes).(*ctypes.ResultStatus)
if status.NodeInfo.Network != chainID { if status.NodeInfo.Network != chainID {
t.Fatal(fmt.Errorf("ChainID mismatch: got %s expected %s", t.Fatal(fmt.Errorf("ChainID mismatch: got %s expected %s",
status.NodeInfo.Network, chainID)) status.NodeInfo.Network, chainID))


+ 23
- 8
rpc/test/helpers.go View File

@ -1,6 +1,7 @@
package rpctest package rpctest
import ( import (
"encoding/json"
"fmt" "fmt"
"net/http" "net/http"
"testing" "testing"
@ -11,11 +12,11 @@ import (
"github.com/tendermint/go-p2p" "github.com/tendermint/go-p2p"
"github.com/tendermint/go-wire" "github.com/tendermint/go-wire"
"github.com/tendermint/go-events"
client "github.com/tendermint/go-rpc/client" client "github.com/tendermint/go-rpc/client"
"github.com/tendermint/go-rpc/types" "github.com/tendermint/go-rpc/types"
_ "github.com/tendermint/tendermint/config/tendermint_test" _ "github.com/tendermint/tendermint/config/tendermint_test"
nm "github.com/tendermint/tendermint/node" nm "github.com/tendermint/tendermint/node"
ctypes "github.com/tendermint/tendermint/rpc/core/types"
"github.com/tendermint/tendermint/types" "github.com/tendermint/tendermint/types"
) )
@ -133,14 +134,24 @@ func waitForEvent(t *testing.T, con *websocket.Conn, eventid string, dieOnTimeou
// if the event id isnt what we're waiting on // if the event id isnt what we're waiting on
// ignore it // ignore it
var response rpctypes.RPCResponse var response rpctypes.RPCResponse
var err error
wire.ReadJSON(&response, p, &err)
if err := json.Unmarshal(p, &response); err != nil {
errCh <- err
break
}
if response.Error != "" {
errCh <- fmt.Errorf(response.Error)
break
}
result := new(ctypes.TMResult)
fmt.Println("RESULT:", string(*response.Result))
wire.ReadJSONPtr(result, *response.Result, &err)
if err != nil { if err != nil {
errCh <- err errCh <- err
break break
} }
event, ok := response.Result.(*events.EventResult)
if ok && event.Event == eventid {
event, ok := (*result).(*ctypes.ResultEvent)
if ok && event.Name == eventid {
goodCh <- p goodCh <- p
break break
} }
@ -186,14 +197,18 @@ func unmarshalResponseNewBlock(b []byte) (*types.Block, error) {
// unmarshall and assert somethings // unmarshall and assert somethings
var response rpctypes.RPCResponse var response rpctypes.RPCResponse
var err error var err error
wire.ReadJSON(&response, b, &err)
if err != nil {
if err := json.Unmarshal(b, &response); err != nil {
return nil, err return nil, err
} }
if response.Error != "" { if response.Error != "" {
return nil, fmt.Errorf(response.Error) return nil, fmt.Errorf(response.Error)
} }
block := response.Result.(*events.EventResult).Data.(types.EventDataNewBlock).Block
var result ctypes.TMResult
wire.ReadJSONPtr(&result, *response.Result, &err)
if err != nil {
return nil, err
}
block := result.(*ctypes.ResultEvent).Data.(types.EventDataNewBlock).Block
return block, nil return block, nil
} }


+ 13
- 7
types/events.go View File

@ -1,7 +1,7 @@
package types package types
import ( import (
// for registering EventData
// for registering TMEventData as events.EventData
"github.com/tendermint/go-events" "github.com/tendermint/go-events"
"github.com/tendermint/go-wire" "github.com/tendermint/go-wire"
) )
@ -30,6 +30,12 @@ func EventStringApp() string { return "App" }
//---------------------------------------- //----------------------------------------
// implements events.EventData
type TMEventData interface {
events.EventData
// AssertIsTMEventData()
}
const ( const (
EventDataTypeNewBlock = byte(0x01) EventDataTypeNewBlock = byte(0x01)
EventDataTypeFork = byte(0x02) EventDataTypeFork = byte(0x02)
@ -41,7 +47,7 @@ const (
) )
var _ = wire.RegisterInterface( var _ = wire.RegisterInterface(
struct{ events.EventData }{},
struct{ TMEventData }{},
wire.ConcreteType{EventDataNewBlock{}, EventDataTypeNewBlock}, wire.ConcreteType{EventDataNewBlock{}, EventDataTypeNewBlock},
// wire.ConcreteType{EventDataFork{}, EventDataTypeFork }, // wire.ConcreteType{EventDataFork{}, EventDataTypeFork },
wire.ConcreteType{EventDataTx{}, EventDataTypeTx}, wire.ConcreteType{EventDataTx{}, EventDataTypeTx},
@ -92,8 +98,8 @@ type EventDataVote struct {
Vote *Vote Vote *Vote
} }
func (_ EventDataNewBlock) AssertIsEventData() {}
func (_ EventDataTx) AssertIsEventData() {}
func (_ EventDataApp) AssertIsEventData() {}
func (_ EventDataRoundState) AssertIsEventData() {}
func (_ EventDataVote) AssertIsEventData() {}
func (_ EventDataNewBlock) AssertIsTMEventData() {}
func (_ EventDataTx) AssertIsTMEventData() {}
func (_ EventDataApp) AssertIsTMEventData() {}
func (_ EventDataRoundState) AssertIsTMEventData() {}
func (_ EventDataVote) AssertIsTMEventData() {}

Loading…
Cancel
Save