diff --git a/consensus/common_test.go b/consensus/common_test.go index 469fc9c6b..1443232a6 100644 --- a/consensus/common_test.go +++ b/consensus/common_test.go @@ -206,7 +206,7 @@ func subscribeToVoter(cs *ConsensusState, addr []byte) chan interface{} { go func() { for { v := <-voteCh0 - vote := v.(types.EventDataVote) + vote := v.(types.TMEventData).Unwrap().(types.EventDataVote) // we only fire for our own votes if bytes.Equal(addr, vote.Vote.ValidatorAddress) { voteCh <- v diff --git a/consensus/mempool_test.go b/consensus/mempool_test.go index 8808185da..15e6a1550 100644 --- a/consensus/mempool_test.go +++ b/consensus/mempool_test.go @@ -44,7 +44,7 @@ func TestTxConcurrentWithCommit(t *testing.T) { for nTxs := 0; nTxs < NTxs; { select { case b := <-newBlockCh: - nTxs += b.(types.EventDataNewBlock).Block.Header.NumTxs + nTxs += b.(types.TMEventData).Unwrap().(types.EventDataNewBlock).Block.Header.NumTxs case <-ticker.C: panic("Timed out waiting to commit blocks with transactions") } diff --git a/consensus/reactor.go b/consensus/reactor.go index 5fe45cc01..5734298ef 100644 --- a/consensus/reactor.go +++ b/consensus/reactor.go @@ -8,11 +8,11 @@ import ( "sync" "time" - . "github.com/tendermint/tmlibs/common" - "github.com/tendermint/tendermint/p2p" "github.com/tendermint/go-wire" + "github.com/tendermint/tendermint/p2p" sm "github.com/tendermint/tendermint/state" "github.com/tendermint/tendermint/types" + . "github.com/tendermint/tmlibs/common" ) const ( @@ -299,12 +299,12 @@ func (conR *ConsensusReactor) SetEventSwitch(evsw types.EventSwitch) { func (conR *ConsensusReactor) registerEventCallbacks() { types.AddListenerForEvent(conR.evsw, "conR", types.EventStringNewRoundStep(), func(data types.TMEventData) { - rs := data.(types.EventDataRoundState).RoundState.(*RoundState) + rs := data.Unwrap().(types.EventDataRoundState).RoundState.(*RoundState) conR.broadcastNewRoundStep(rs) }) types.AddListenerForEvent(conR.evsw, "conR", types.EventStringVote(), func(data types.TMEventData) { - edv := data.(types.EventDataVote) + edv := data.Unwrap().(types.EventDataVote) conR.broadcastHasVoteMessage(edv.Vote) }) } diff --git a/consensus/reactor_test.go b/consensus/reactor_test.go index 66827c155..fdf590151 100644 --- a/consensus/reactor_test.go +++ b/consensus/reactor_test.go @@ -8,10 +8,10 @@ import ( "github.com/tendermint/tendermint/config/tendermint_test" - "github.com/tendermint/tmlibs/events" + "github.com/tendermint/abci/example/dummy" "github.com/tendermint/tendermint/p2p" "github.com/tendermint/tendermint/types" - "github.com/tendermint/abci/example/dummy" + "github.com/tendermint/tmlibs/events" ) func init() { @@ -252,7 +252,7 @@ func TestReactorWithTimeoutCommit(t *testing.T) { func waitForAndValidateBlock(t *testing.T, n int, activeVals map[string]struct{}, eventChans []chan interface{}, css []*ConsensusState, txs ...[]byte) { timeoutWaitGroup(t, n, func(wg *sync.WaitGroup, j int) { newBlockI := <-eventChans[j] - newBlock := newBlockI.(types.EventDataNewBlock).Block + newBlock := newBlockI.(types.TMEventData).Unwrap().(types.EventDataNewBlock).Block log.Warn("Got block", "height", newBlock.Height, "validator", j) err := validateBlock(newBlock, activeVals) if err != nil { diff --git a/consensus/state_test.go b/consensus/state_test.go index d2d34e3ad..2d5ed410b 100644 --- a/consensus/state_test.go +++ b/consensus/state_test.go @@ -6,9 +6,9 @@ import ( "testing" "time" - . "github.com/tendermint/tmlibs/common" "github.com/tendermint/tendermint/config/tendermint_test" "github.com/tendermint/tendermint/types" + . "github.com/tendermint/tmlibs/common" ) func init() { @@ -248,7 +248,7 @@ func TestFullRound1(t *testing.T) { // grab proposal re := <-propCh - propBlockHash := re.(types.EventDataRoundState).RoundState.(*RoundState).ProposalBlock.Hash() + propBlockHash := re.(types.TMEventData).Unwrap().(types.EventDataRoundState).RoundState.(*RoundState).ProposalBlock.Hash() <-voteCh // wait for prevote // NOTE: voteChan cap of 0 ensures we can complete this @@ -345,7 +345,7 @@ func TestLockNoPOL(t *testing.T) { cs1.startRoutines(0) re := <-proposalCh - rs := re.(types.EventDataRoundState).RoundState.(*RoundState) + rs := re.(types.TMEventData).Unwrap().(types.EventDataRoundState).RoundState.(*RoundState) theBlockHash := rs.ProposalBlock.Hash() <-voteCh // prevote @@ -385,7 +385,7 @@ func TestLockNoPOL(t *testing.T) { // now we're on a new round and not the proposer, so wait for timeout re = <-timeoutProposeCh - rs = re.(types.EventDataRoundState).RoundState.(*RoundState) + rs = re.(types.TMEventData).Unwrap().(types.EventDataRoundState).RoundState.(*RoundState) if rs.ProposalBlock != nil { panic("Expected proposal block to be nil") @@ -429,7 +429,7 @@ func TestLockNoPOL(t *testing.T) { incrementRound(vs2) re = <-proposalCh - rs = re.(types.EventDataRoundState).RoundState.(*RoundState) + rs = re.(types.TMEventData).Unwrap().(types.EventDataRoundState).RoundState.(*RoundState) // now we're on a new round and are the proposer if !bytes.Equal(rs.ProposalBlock.Hash(), rs.LockedBlock.Hash()) { @@ -518,7 +518,7 @@ func TestLockPOLRelock(t *testing.T) { <-newRoundCh re := <-proposalCh - rs := re.(types.EventDataRoundState).RoundState.(*RoundState) + rs := re.(types.TMEventData).Unwrap().(types.EventDataRoundState).RoundState.(*RoundState) theBlockHash := rs.ProposalBlock.Hash() <-voteCh // prevote @@ -589,9 +589,9 @@ func TestLockPOLRelock(t *testing.T) { _, _ = <-voteCh, <-voteCh be := <-newBlockCh - b := be.(types.EventDataNewBlockHeader) + b := be.(types.TMEventData).Unwrap().(types.EventDataNewBlockHeader) re = <-newRoundCh - rs = re.(types.EventDataRoundState).RoundState.(*RoundState) + rs = re.(types.TMEventData).Unwrap().(types.EventDataRoundState).RoundState.(*RoundState) if rs.Height != 2 { panic("Expected height to increment") } @@ -627,7 +627,7 @@ func TestLockPOLUnlock(t *testing.T) { startTestRound(cs1, cs1.Height, 0) <-newRoundCh re := <-proposalCh - rs := re.(types.EventDataRoundState).RoundState.(*RoundState) + rs := re.(types.TMEventData).Unwrap().(types.EventDataRoundState).RoundState.(*RoundState) theBlockHash := rs.ProposalBlock.Hash() <-voteCh // prevote @@ -653,7 +653,7 @@ func TestLockPOLUnlock(t *testing.T) { // timeout to new round re = <-timeoutWaitCh - rs = re.(types.EventDataRoundState).RoundState.(*RoundState) + rs = re.(types.TMEventData).Unwrap().(types.EventDataRoundState).RoundState.(*RoundState) lockedBlockHash := rs.LockedBlock.Hash() //XXX: this isnt gauranteed to get there before the timeoutPropose ... @@ -713,7 +713,7 @@ func TestLockPOLSafety1(t *testing.T) { startTestRound(cs1, cs1.Height, 0) <-newRoundCh re := <-proposalCh - rs := re.(types.EventDataRoundState).RoundState.(*RoundState) + rs := re.(types.TMEventData).Unwrap().(types.EventDataRoundState).RoundState.(*RoundState) propBlock := rs.ProposalBlock <-voteCh // prevote @@ -761,7 +761,7 @@ func TestLockPOLSafety1(t *testing.T) { re = <-proposalCh } - rs = re.(types.EventDataRoundState).RoundState.(*RoundState) + rs = re.(types.TMEventData).Unwrap().(types.EventDataRoundState).RoundState.(*RoundState) if rs.LockedBlock != nil { panic("we should not be locked!") @@ -1009,7 +1009,7 @@ func TestHalt1(t *testing.T) { startTestRound(cs1, cs1.Height, 0) <-newRoundCh re := <-proposalCh - rs := re.(types.EventDataRoundState).RoundState.(*RoundState) + rs := re.(types.TMEventData).Unwrap().(types.EventDataRoundState).RoundState.(*RoundState) propBlock := rs.ProposalBlock propBlockParts := propBlock.MakePartSet(partSize) @@ -1032,7 +1032,7 @@ func TestHalt1(t *testing.T) { // timeout to new round <-timeoutWaitCh re = <-newRoundCh - rs = re.(types.EventDataRoundState).RoundState.(*RoundState) + rs = re.(types.TMEventData).Unwrap().(types.EventDataRoundState).RoundState.(*RoundState) log.Notice("### ONTO ROUND 1") /*Round2 @@ -1050,7 +1050,7 @@ func TestHalt1(t *testing.T) { // receiving that precommit should take us straight to commit <-newBlockCh re = <-newRoundCh - rs = re.(types.EventDataRoundState).RoundState.(*RoundState) + rs = re.(types.TMEventData).Unwrap().(types.EventDataRoundState).RoundState.(*RoundState) if rs.Height != 2 { panic("expected height to increment") diff --git a/rpc/client/event_test.go b/rpc/client/event_test.go index cc421ad90..1b99854cb 100644 --- a/rpc/client/event_test.go +++ b/rpc/client/event_test.go @@ -25,7 +25,7 @@ func TestHeaderEvents(t *testing.T) { evtTyp := types.EventStringNewBlockHeader() evt, err := client.WaitForOneEvent(c, evtTyp, 1*time.Second) require.Nil(err, "%d: %+v", i, err) - _, ok := evt.(types.EventDataNewBlockHeader) + _, ok := evt.Unwrap().(types.EventDataNewBlockHeader) require.True(ok, "%d: %#v", i, evt) // TODO: more checks... } @@ -56,7 +56,7 @@ func TestTxEvents(t *testing.T) { evt, err := client.WaitForOneEvent(c, evtTyp, 1*time.Second) require.Nil(err, "%d: %+v", i, err) // and make sure it has the proper info - txe, ok := evt.(types.EventDataTx) + txe, ok := evt.Unwrap().(types.EventDataTx) require.True(ok, "%d: %#v", i, evt) // make sure this is the proper tx require.EqualValues(tx, txe.Tx) diff --git a/rpc/client/helpers.go b/rpc/client/helpers.go index 330bcd199..bc26ea57f 100644 --- a/rpc/client/helpers.go +++ b/rpc/client/helpers.go @@ -4,9 +4,9 @@ import ( "time" "github.com/pkg/errors" + "github.com/tendermint/tendermint/types" cmn "github.com/tendermint/tmlibs/common" events "github.com/tendermint/tmlibs/events" - "github.com/tendermint/tendermint/types" ) // Waiter is informed of current height, decided whether to quit early @@ -77,12 +77,12 @@ func WaitForOneEvent(evsw types.EventSwitch, select { case <-quit: - return nil, errors.New("timed out waiting for event") + return types.TMEventData{}, errors.New("timed out waiting for event") case evt := <-evts: tmevt, ok := evt.(types.TMEventData) if ok { return tmevt, nil } - return nil, errors.Errorf("Got unexpected event type: %#v", evt) + return types.TMEventData{}, errors.Errorf("Got unexpected event type: %#v", evt) } } diff --git a/rpc/client/httpclient.go b/rpc/client/httpclient.go index 7eecea641..cb7149406 100644 --- a/rpc/client/httpclient.go +++ b/rpc/client/httpclient.go @@ -1,10 +1,10 @@ package client import ( + "encoding/json" "fmt" "github.com/pkg/errors" - wire "github.com/tendermint/go-wire" data "github.com/tendermint/go-wire/data" ctypes "github.com/tendermint/tendermint/rpc/core/types" "github.com/tendermint/tendermint/rpc/lib/client" @@ -50,42 +50,41 @@ func (c *HTTP) _assertIsEventSwitch() types.EventSwitch { } func (c *HTTP) Status() (*ctypes.ResultStatus, error) { - tmResult := new(ctypes.TMResult) - _, err := c.rpc.Call("status", map[string]interface{}{}, tmResult) + result := new(ctypes.ResultStatus) + _, err := c.rpc.Call("status", map[string]interface{}{}, result) if err != nil { return nil, errors.Wrap(err, "Status") } - // note: panics if rpc doesn't match. okay??? - return tmResult.Unwrap().(*ctypes.ResultStatus), nil + return result, nil } func (c *HTTP) ABCIInfo() (*ctypes.ResultABCIInfo, error) { - tmResult := new(ctypes.TMResult) - _, err := c.rpc.Call("abci_info", map[string]interface{}{}, tmResult) + result := new(ctypes.ResultABCIInfo) + _, err := c.rpc.Call("abci_info", map[string]interface{}{}, result) if err != nil { return nil, errors.Wrap(err, "ABCIInfo") } - return tmResult.Unwrap().(*ctypes.ResultABCIInfo), nil + return result, nil } func (c *HTTP) ABCIQuery(path string, data data.Bytes, prove bool) (*ctypes.ResultABCIQuery, error) { - tmResult := new(ctypes.TMResult) + result := new(ctypes.ResultABCIQuery) _, err := c.rpc.Call("abci_query", map[string]interface{}{"path": path, "data": data, "prove": prove}, - tmResult) + result) if err != nil { return nil, errors.Wrap(err, "ABCIQuery") } - return tmResult.Unwrap().(*ctypes.ResultABCIQuery), nil + return result, nil } func (c *HTTP) BroadcastTxCommit(tx types.Tx) (*ctypes.ResultBroadcastTxCommit, error) { - tmResult := new(ctypes.TMResult) - _, err := c.rpc.Call("broadcast_tx_commit", map[string]interface{}{"tx": tx}, tmResult) + result := new(ctypes.ResultBroadcastTxCommit) + _, err := c.rpc.Call("broadcast_tx_commit", map[string]interface{}{"tx": tx}, result) if err != nil { return nil, errors.Wrap(err, "broadcast_tx_commit") } - return tmResult.Unwrap().(*ctypes.ResultBroadcastTxCommit), nil + return result, nil } func (c *HTTP) BroadcastTxAsync(tx types.Tx) (*ctypes.ResultBroadcastTx, error) { @@ -97,90 +96,90 @@ func (c *HTTP) BroadcastTxSync(tx types.Tx) (*ctypes.ResultBroadcastTx, error) { } func (c *HTTP) broadcastTX(route string, tx types.Tx) (*ctypes.ResultBroadcastTx, error) { - tmResult := new(ctypes.TMResult) - _, err := c.rpc.Call(route, map[string]interface{}{"tx": tx}, tmResult) + result := new(ctypes.ResultBroadcastTx) + _, err := c.rpc.Call(route, map[string]interface{}{"tx": tx}, result) if err != nil { return nil, errors.Wrap(err, route) } - return tmResult.Unwrap().(*ctypes.ResultBroadcastTx), nil + return result, nil } func (c *HTTP) NetInfo() (*ctypes.ResultNetInfo, error) { - tmResult := new(ctypes.TMResult) - _, err := c.rpc.Call("net_info", map[string]interface{}{}, tmResult) + result := new(ctypes.ResultNetInfo) + _, err := c.rpc.Call("net_info", map[string]interface{}{}, result) if err != nil { return nil, errors.Wrap(err, "NetInfo") } - return tmResult.Unwrap().(*ctypes.ResultNetInfo), nil + return result, nil } func (c *HTTP) DumpConsensusState() (*ctypes.ResultDumpConsensusState, error) { - tmResult := new(ctypes.TMResult) - _, err := c.rpc.Call("dump_consensus_state", map[string]interface{}{}, tmResult) + result := new(ctypes.ResultDumpConsensusState) + _, err := c.rpc.Call("dump_consensus_state", map[string]interface{}{}, result) if err != nil { return nil, errors.Wrap(err, "DumpConsensusState") } - return tmResult.Unwrap().(*ctypes.ResultDumpConsensusState), nil + return result, nil } func (c *HTTP) BlockchainInfo(minHeight, maxHeight int) (*ctypes.ResultBlockchainInfo, error) { - tmResult := new(ctypes.TMResult) + result := new(ctypes.ResultBlockchainInfo) _, err := c.rpc.Call("blockchain", map[string]interface{}{"minHeight": minHeight, "maxHeight": maxHeight}, - tmResult) + result) if err != nil { return nil, errors.Wrap(err, "BlockchainInfo") } - return tmResult.Unwrap().(*ctypes.ResultBlockchainInfo), nil + return result, nil } func (c *HTTP) Genesis() (*ctypes.ResultGenesis, error) { - tmResult := new(ctypes.TMResult) - _, err := c.rpc.Call("genesis", map[string]interface{}{}, tmResult) + result := new(ctypes.ResultGenesis) + _, err := c.rpc.Call("genesis", map[string]interface{}{}, result) if err != nil { return nil, errors.Wrap(err, "Genesis") } - return tmResult.Unwrap().(*ctypes.ResultGenesis), nil + return result, nil } func (c *HTTP) Block(height int) (*ctypes.ResultBlock, error) { - tmResult := new(ctypes.TMResult) - _, err := c.rpc.Call("block", map[string]interface{}{"height": height}, tmResult) + result := new(ctypes.ResultBlock) + _, err := c.rpc.Call("block", map[string]interface{}{"height": height}, result) if err != nil { return nil, errors.Wrap(err, "Block") } - return tmResult.Unwrap().(*ctypes.ResultBlock), nil + return result, nil } func (c *HTTP) Commit(height int) (*ctypes.ResultCommit, error) { - tmResult := new(ctypes.TMResult) - _, err := c.rpc.Call("commit", map[string]interface{}{"height": height}, tmResult) + result := new(ctypes.ResultCommit) + _, err := c.rpc.Call("commit", map[string]interface{}{"height": height}, result) if err != nil { return nil, errors.Wrap(err, "Commit") } - return tmResult.Unwrap().(*ctypes.ResultCommit), nil + return result, nil } func (c *HTTP) Tx(hash []byte, prove bool) (*ctypes.ResultTx, error) { - tmResult := new(ctypes.TMResult) + result := new(ctypes.ResultTx) query := map[string]interface{}{ "hash": hash, "prove": prove, } - _, err := c.rpc.Call("tx", query, tmResult) + _, err := c.rpc.Call("tx", query, result) if err != nil { return nil, errors.Wrap(err, "Tx") } - return tmResult.Unwrap().(*ctypes.ResultTx), nil + return result, nil } func (c *HTTP) Validators() (*ctypes.ResultValidators, error) { - tmResult := new(ctypes.TMResult) - _, err := c.rpc.Call("validators", map[string]interface{}{}, tmResult) + result := new(ctypes.ResultValidators) + _, err := c.rpc.Call("validators", map[string]interface{}{}, result) if err != nil { return nil, errors.Wrap(err, "Validators") } - return tmResult.Unwrap().(*ctypes.ResultValidators), nil + return result, nil } /** websocket event stuff here... **/ @@ -335,18 +334,15 @@ func (w *WSEvents) eventListener() { // some implementation of types.TMEventData, and sends it off // on the merry way to the EventSwitch func (w *WSEvents) parseEvent(data []byte) (err error) { - result := new(ctypes.TMResult) - wire.ReadJSONPtr(result, data, &err) + result := new(ctypes.ResultEvent) + err = json.Unmarshal(data, result) if err != nil { - return err - } - event, ok := result.Unwrap().(*ctypes.ResultEvent) - if !ok { // ignore silently (eg. subscribe, unsubscribe and maybe other events) + // TODO: ? return nil } // looks good! let's fire this baby! - w.EventSwitch.FireEvent(event.Name, event.Data) + w.EventSwitch.FireEvent(result.Name, result.Data) return nil } diff --git a/rpc/core/events.go b/rpc/core/events.go index 271ba5301..fa40c5a81 100644 --- a/rpc/core/events.go +++ b/rpc/core/events.go @@ -11,8 +11,8 @@ func Subscribe(wsCtx rpctypes.WSRPCContext, event string) (*ctypes.ResultSubscri types.AddListenerForEvent(wsCtx.GetEventSwitch(), wsCtx.GetRemoteAddr(), event, func(msg types.TMEventData) { // NOTE: EventSwitch callbacks must be nonblocking // NOTE: RPCResponses of subscribed events have id suffix "#event" - tmResult := ctypes.TMResult{&ctypes.ResultEvent{event, msg}} - wsCtx.TryWriteRPCResponse(rpctypes.NewRPCResponse(wsCtx.Request.ID+"#event", &tmResult, "")) + tmResult := &ctypes.ResultEvent{event, msg} + wsCtx.TryWriteRPCResponse(rpctypes.NewRPCResponse(wsCtx.Request.ID+"#event", tmResult, "")) }) return &ctypes.ResultSubscribe{}, nil } diff --git a/rpc/core/mempool.go b/rpc/core/mempool.go index 4c1b54cfc..a6e7093fd 100644 --- a/rpc/core/mempool.go +++ b/rpc/core/mempool.go @@ -50,7 +50,7 @@ func BroadcastTxCommit(tx types.Tx) (*ctypes.ResultBroadcastTxCommit, error) { // subscribe to tx being committed in block deliverTxResCh := make(chan types.EventDataTx, 1) types.AddListenerForEvent(eventSwitch, "rpc", types.EventStringTx(tx), func(data types.TMEventData) { - deliverTxResCh <- data.(types.EventDataTx) + deliverTxResCh <- data.Unwrap().(types.EventDataTx) }) // broadcast the tx and register checktx callback diff --git a/rpc/core/routes.go b/rpc/core/routes.go index 9e4c03be1..b662ae48c 100644 --- a/rpc/core/routes.go +++ b/rpc/core/routes.go @@ -1,173 +1,46 @@ package core import ( - data "github.com/tendermint/go-wire/data" - ctypes "github.com/tendermint/tendermint/rpc/core/types" rpc "github.com/tendermint/tendermint/rpc/lib/server" - "github.com/tendermint/tendermint/rpc/lib/types" - "github.com/tendermint/tendermint/types" ) // TODO: better system than "unsafe" prefix var Routes = map[string]*rpc.RPCFunc{ // subscribe/unsubscribe are reserved for websocket events. - "subscribe": rpc.NewWSRPCFunc(SubscribeResult, "event"), - "unsubscribe": rpc.NewWSRPCFunc(UnsubscribeResult, "event"), + "subscribe": rpc.NewWSRPCFunc(Subscribe, "event"), + "unsubscribe": rpc.NewWSRPCFunc(Unsubscribe, "event"), // info API - "status": rpc.NewRPCFunc(StatusResult, ""), - "net_info": rpc.NewRPCFunc(NetInfoResult, ""), - "blockchain": rpc.NewRPCFunc(BlockchainInfoResult, "minHeight,maxHeight"), - "genesis": rpc.NewRPCFunc(GenesisResult, ""), - "block": rpc.NewRPCFunc(BlockResult, "height"), - "commit": rpc.NewRPCFunc(CommitResult, "height"), - "tx": rpc.NewRPCFunc(TxResult, "hash,prove"), - "validators": rpc.NewRPCFunc(ValidatorsResult, ""), - "dump_consensus_state": rpc.NewRPCFunc(DumpConsensusStateResult, ""), - "unconfirmed_txs": rpc.NewRPCFunc(UnconfirmedTxsResult, ""), - "num_unconfirmed_txs": rpc.NewRPCFunc(NumUnconfirmedTxsResult, ""), + "status": rpc.NewRPCFunc(Status, ""), + "net_info": rpc.NewRPCFunc(NetInfo, ""), + "blockchain": rpc.NewRPCFunc(BlockchainInfo, "minHeight,maxHeight"), + "genesis": rpc.NewRPCFunc(Genesis, ""), + "block": rpc.NewRPCFunc(Block, "height"), + "commit": rpc.NewRPCFunc(Commit, "height"), + "tx": rpc.NewRPCFunc(Tx, "hash,prove"), + "validators": rpc.NewRPCFunc(Validators, ""), + "dump_consensus_state": rpc.NewRPCFunc(DumpConsensusState, ""), + "unconfirmed_txs": rpc.NewRPCFunc(UnconfirmedTxs, ""), + "num_unconfirmed_txs": rpc.NewRPCFunc(NumUnconfirmedTxs, ""), // broadcast API - "broadcast_tx_commit": rpc.NewRPCFunc(BroadcastTxCommitResult, "tx"), - "broadcast_tx_sync": rpc.NewRPCFunc(BroadcastTxSyncResult, "tx"), - "broadcast_tx_async": rpc.NewRPCFunc(BroadcastTxAsyncResult, "tx"), + "broadcast_tx_commit": rpc.NewRPCFunc(BroadcastTxCommit, "tx"), + "broadcast_tx_sync": rpc.NewRPCFunc(BroadcastTxSync, "tx"), + "broadcast_tx_async": rpc.NewRPCFunc(BroadcastTxAsync, "tx"), // abci API - "abci_query": rpc.NewRPCFunc(ABCIQueryResult, "path,data,prove"), - "abci_info": rpc.NewRPCFunc(ABCIInfoResult, ""), + "abci_query": rpc.NewRPCFunc(ABCIQuery, "path,data,prove"), + "abci_info": rpc.NewRPCFunc(ABCIInfo, ""), // control API - "dial_seeds": rpc.NewRPCFunc(UnsafeDialSeedsResult, "seeds"), + "dial_seeds": rpc.NewRPCFunc(UnsafeDialSeeds, "seeds"), "unsafe_flush_mempool": rpc.NewRPCFunc(UnsafeFlushMempool, ""), // config is not in general thread safe. expose specifics if you need em - // "unsafe_set_config": rpc.NewRPCFunc(UnsafeSetConfigResult, "type,key,value"), + // "unsafe_set_config": rpc.NewRPCFunc(UnsafeSetConfig, "type,key,value"), // profiler API - "unsafe_start_cpu_profiler": rpc.NewRPCFunc(UnsafeStartCPUProfilerResult, "filename"), - "unsafe_stop_cpu_profiler": rpc.NewRPCFunc(UnsafeStopCPUProfilerResult, ""), - "unsafe_write_heap_profile": rpc.NewRPCFunc(UnsafeWriteHeapProfileResult, "filename"), -} - -func SubscribeResult(wsCtx rpctypes.WSRPCContext, event string) (ctypes.TMResult, error) { - res, err := Subscribe(wsCtx, event) - return ctypes.TMResult{res}, err -} - -func UnsubscribeResult(wsCtx rpctypes.WSRPCContext, event string) (ctypes.TMResult, error) { - res, err := Unsubscribe(wsCtx, event) - return ctypes.TMResult{res}, err -} - -func StatusResult() (ctypes.TMResult, error) { - res, err := Status() - return ctypes.TMResult{res}, err -} - -func NetInfoResult() (ctypes.TMResult, error) { - res, err := NetInfo() - return ctypes.TMResult{res}, err -} - -func UnsafeDialSeedsResult(seeds []string) (ctypes.TMResult, error) { - res, err := UnsafeDialSeeds(seeds) - return ctypes.TMResult{res}, err -} - -func BlockchainInfoResult(min, max int) (ctypes.TMResult, error) { - res, err := BlockchainInfo(min, max) - return ctypes.TMResult{res}, err -} - -func GenesisResult() (ctypes.TMResult, error) { - res, err := Genesis() - return ctypes.TMResult{res}, err -} - -func BlockResult(height int) (ctypes.TMResult, error) { - res, err := Block(height) - return ctypes.TMResult{res}, err -} - -func CommitResult(height int) (ctypes.TMResult, error) { - res, err := Commit(height) - return ctypes.TMResult{res}, err -} - -func ValidatorsResult() (ctypes.TMResult, error) { - res, err := Validators() - return ctypes.TMResult{res}, err -} - -func DumpConsensusStateResult() (ctypes.TMResult, error) { - res, err := DumpConsensusState() - return ctypes.TMResult{res}, err -} - -func UnconfirmedTxsResult() (ctypes.TMResult, error) { - res, err := UnconfirmedTxs() - return ctypes.TMResult{res}, err -} - -func NumUnconfirmedTxsResult() (ctypes.TMResult, error) { - res, err := NumUnconfirmedTxs() - return ctypes.TMResult{res}, err -} - -// Tx allow user to query the transaction results. `nil` could mean the -// transaction is in the mempool, invalidated, or was not send in the first -// place. -func TxResult(hash []byte, prove bool) (ctypes.TMResult, error) { - res, err := Tx(hash, prove) - return ctypes.TMResult{res}, err -} - -func BroadcastTxCommitResult(tx types.Tx) (ctypes.TMResult, error) { - res, err := BroadcastTxCommit(tx) - return ctypes.TMResult{res}, err -} - -func BroadcastTxSyncResult(tx types.Tx) (ctypes.TMResult, error) { - res, err := BroadcastTxSync(tx) - return ctypes.TMResult{res}, err -} - -func BroadcastTxAsyncResult(tx types.Tx) (ctypes.TMResult, error) { - res, err := BroadcastTxAsync(tx) - return ctypes.TMResult{res}, err -} - -func ABCIQueryResult(path string, data data.Bytes, prove bool) (ctypes.TMResult, error) { - res, err := ABCIQuery(path, data, prove) - return ctypes.TMResult{res}, err -} - -func ABCIInfoResult() (ctypes.TMResult, error) { - res, err := ABCIInfo() - return ctypes.TMResult{res}, err -} - -func UnsafeFlushMempoolResult() (ctypes.TMResult, error) { - res, err := UnsafeFlushMempool() - return ctypes.TMResult{res}, err -} - -func UnsafeSetConfigResult(typ, key, value string) (ctypes.TMResult, error) { - res, err := UnsafeSetConfig(typ, key, value) - return ctypes.TMResult{res}, err -} - -func UnsafeStartCPUProfilerResult(filename string) (ctypes.TMResult, error) { - res, err := UnsafeStartCPUProfiler(filename) - return ctypes.TMResult{res}, err -} - -func UnsafeStopCPUProfilerResult() (ctypes.TMResult, error) { - res, err := UnsafeStopCPUProfiler() - return ctypes.TMResult{res}, err -} - -func UnsafeWriteHeapProfileResult(filename string) (ctypes.TMResult, error) { - res, err := UnsafeWriteHeapProfile(filename) - return ctypes.TMResult{res}, err + "unsafe_start_cpu_profiler": rpc.NewRPCFunc(UnsafeStartCPUProfiler, "filename"), + "unsafe_stop_cpu_profiler": rpc.NewRPCFunc(UnsafeStopCPUProfiler, ""), + "unsafe_write_heap_profile": rpc.NewRPCFunc(UnsafeWriteHeapProfile, "filename"), } diff --git a/rpc/core/tx.go b/rpc/core/tx.go index 387d4afa6..5bd6e1806 100644 --- a/rpc/core/tx.go +++ b/rpc/core/tx.go @@ -8,6 +8,9 @@ import ( "github.com/tendermint/tendermint/types" ) +// Tx allow user to query the transaction results. `nil` could mean the +// transaction is in the mempool, invalidated, or was not send in the first +// place. func Tx(hash []byte, prove bool) (*ctypes.ResultTx, error) { // if index is disabled, return error diff --git a/rpc/core/types/responses.go b/rpc/core/types/responses.go index 9b4e9d579..23d68587b 100644 --- a/rpc/core/types/responses.go +++ b/rpc/core/types/responses.go @@ -7,7 +7,7 @@ import ( "github.com/tendermint/go-crypto" "github.com/tendermint/go-wire/data" "github.com/tendermint/tendermint/p2p" - "github.com/tendermint/tendermint/rpc/lib/types" + // "github.com/tendermint/tendermint/rpc/lib/types" "github.com/tendermint/tendermint/types" ) @@ -212,60 +212,3 @@ const ( ResultNameUnsafeWriteHeapProfile = "unsafe_write_heap" ResultNameUnsafeFlushMempool = "unsafe_flush_mempool" ) - -type TMResultInner interface { - rpctypes.Result -} - -type TMResult struct { - TMResultInner `json:"unwrap"` -} - -func (tmr TMResult) MarshalJSON() ([]byte, error) { - return tmResultMapper.ToJSON(tmr.TMResultInner) -} - -func (tmr *TMResult) UnmarshalJSON(data []byte) (err error) { - parsed, err := tmResultMapper.FromJSON(data) - if err == nil && parsed != nil { - tmr.TMResultInner = parsed.(TMResultInner) - } - return -} - -func (tmr TMResult) Unwrap() TMResultInner { - tmrI := tmr.TMResultInner - for wrap, ok := tmrI.(TMResult); ok; wrap, ok = tmrI.(TMResult) { - tmrI = wrap.TMResultInner - } - return tmrI -} - -func (tmr TMResult) Empty() bool { - return tmr.TMResultInner == nil -} - -var tmResultMapper = data.NewMapper(TMResult{}). - RegisterImplementation(&ResultGenesis{}, ResultNameGenesis, ResultTypeGenesis). - RegisterImplementation(&ResultBlockchainInfo{}, ResultNameBlockchainInfo, ResultTypeBlockchainInfo). - RegisterImplementation(&ResultBlock{}, ResultNameBlock, ResultTypeBlock). - RegisterImplementation(&ResultCommit{}, ResultNameCommit, ResultTypeCommit). - RegisterImplementation(&ResultStatus{}, ResultNameStatus, ResultTypeStatus). - RegisterImplementation(&ResultNetInfo{}, ResultNameNetInfo, ResultTypeNetInfo). - RegisterImplementation(&ResultDialSeeds{}, ResultNameDialSeeds, ResultTypeDialSeeds). - RegisterImplementation(&ResultValidators{}, ResultNameValidators, ResultTypeValidators). - RegisterImplementation(&ResultDumpConsensusState{}, ResultNameDumpConsensusState, ResultTypeDumpConsensusState). - RegisterImplementation(&ResultBroadcastTx{}, ResultNameBroadcastTx, ResultTypeBroadcastTx). - RegisterImplementation(&ResultBroadcastTxCommit{}, ResultNameBroadcastTxCommit, ResultTypeBroadcastTxCommit). - RegisterImplementation(&ResultTx{}, ResultNameTx, ResultTypeTx). - RegisterImplementation(&ResultUnconfirmedTxs{}, ResultNameUnconfirmedTxs, ResultTypeUnconfirmedTxs). - RegisterImplementation(&ResultSubscribe{}, ResultNameSubscribe, ResultTypeSubscribe). - RegisterImplementation(&ResultUnsubscribe{}, ResultNameUnsubscribe, ResultTypeUnsubscribe). - RegisterImplementation(&ResultEvent{}, ResultNameEvent, ResultTypeEvent). - RegisterImplementation(&ResultUnsafeSetConfig{}, ResultNameUnsafeSetConfig, ResultTypeUnsafeSetConfig). - RegisterImplementation(&ResultUnsafeProfile{}, ResultNameUnsafeStartCPUProfiler, ResultTypeUnsafeStartCPUProfiler). - RegisterImplementation(&ResultUnsafeProfile{}, ResultNameUnsafeStopCPUProfiler, ResultTypeUnsafeStopCPUProfiler). - RegisterImplementation(&ResultUnsafeProfile{}, ResultNameUnsafeWriteHeapProfile, ResultTypeUnsafeWriteHeapProfile). - RegisterImplementation(&ResultUnsafeFlushMempool{}, ResultNameUnsafeFlushMempool, ResultTypeUnsafeFlushMempool). - RegisterImplementation(&ResultABCIQuery{}, ResultNameABCIQuery, ResultTypeABCIQuery). - RegisterImplementation(&ResultABCIInfo{}, ResultNameABCIInfo, ResultTypeABCIInfo) diff --git a/rpc/lib/client/http_client.go b/rpc/lib/client/http_client.go index 45ff8b8ac..55963f506 100644 --- a/rpc/lib/client/http_client.go +++ b/rpc/lib/client/http_client.go @@ -12,7 +12,6 @@ import ( "strings" "github.com/pkg/errors" - wire "github.com/tendermint/go-wire" types "github.com/tendermint/tendermint/rpc/lib/types" ) @@ -70,15 +69,15 @@ func NewJSONRPCClient(remote string) *JSONRPCClient { func (c *JSONRPCClient) Call(method string, params map[string]interface{}, result interface{}) (interface{}, error) { // we need this step because we attempt to decode values using `go-wire` // (handlers.go:176) on the server side - encodedParams := make(map[string]interface{}) - for k, v := range params { - bytes := json.RawMessage(wire.JSONBytes(v)) - encodedParams[k] = &bytes - } + // encodedParams := make(map[string]interface{}) + // for k, v := range params { + // bytes := json.RawMessage(wire.JSONBytes(v)) + // encodedParams[k] = &bytes + // } request := types.RPCRequest{ JSONRPC: "2.0", Method: method, - Params: encodedParams, + Params: params, ID: "", } requestBytes, err := json.Marshal(request) @@ -153,7 +152,7 @@ func unmarshalResponseBytes(responseBytes []byte, result interface{}) (interface return nil, errors.Errorf("Response error: %v", errorStr) } // unmarshal the RawMessage into the result - result = wire.ReadJSONPtr(result, *response.Result, &err) + err = json.Unmarshal(*response.Result, result) if err != nil { return nil, errors.Errorf("Error unmarshalling rpc response result: %v", err) } @@ -176,8 +175,6 @@ func argsToURLValues(args map[string]interface{}) (url.Values, error) { } func argsToJson(args map[string]interface{}) error { - var n int - var err error for k, v := range args { rt := reflect.TypeOf(v) isByteSlice := rt.Kind() == reflect.Slice && rt.Elem().Kind() == reflect.Uint8 @@ -188,12 +185,11 @@ func argsToJson(args map[string]interface{}) error { } // Pass everything else to go-wire - buf := new(bytes.Buffer) - wire.WriteJSON(v, buf, &n, &err) + data, err := json.Marshal(v) if err != nil { return err } - args[k] = buf.String() + args[k] = string(data) } return nil } diff --git a/rpc/lib/client/ws_client.go b/rpc/lib/client/ws_client.go index ad922dd68..8884068b9 100644 --- a/rpc/lib/client/ws_client.go +++ b/rpc/lib/client/ws_client.go @@ -8,9 +8,8 @@ import ( "github.com/gorilla/websocket" "github.com/pkg/errors" - cmn "github.com/tendermint/tmlibs/common" types "github.com/tendermint/tendermint/rpc/lib/types" - wire "github.com/tendermint/go-wire" + cmn "github.com/tendermint/tmlibs/common" ) const ( @@ -157,15 +156,15 @@ func (wsc *WSClient) Unsubscribe(eventid string) error { func (wsc *WSClient) Call(method string, params map[string]interface{}) error { // we need this step because we attempt to decode values using `go-wire` // (handlers.go:470) on the server side - encodedParams := make(map[string]interface{}) - for k, v := range params { - bytes := json.RawMessage(wire.JSONBytes(v)) - encodedParams[k] = &bytes - } + // encodedParams := make(map[string]interface{}) + // for k, v := range params { + // bytes := json.RawMessage(wire.JSONBytes(v)) + // encodedParams[k] = &bytes + // } err := wsc.WriteJSON(types.RPCRequest{ JSONRPC: "2.0", Method: method, - Params: encodedParams, + Params: params, ID: "", }) return err diff --git a/rpc/lib/rpc_test.go b/rpc/lib/rpc_test.go index f42fd274c..a14551f4f 100644 --- a/rpc/lib/rpc_test.go +++ b/rpc/lib/rpc_test.go @@ -3,6 +3,7 @@ package rpc import ( "bytes" crand "crypto/rand" + "encoding/json" "fmt" "math/rand" "net/http" @@ -12,7 +13,7 @@ import ( "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" - wire "github.com/tendermint/go-wire" + "github.com/tendermint/go-wire/data" client "github.com/tendermint/tendermint/rpc/lib/client" server "github.com/tendermint/tendermint/rpc/lib/server" types "github.com/tendermint/tendermint/rpc/lib/types" @@ -28,42 +29,51 @@ const ( websocketEndpoint = "/websocket/endpoint" ) -// Define a type for results and register concrete versions -type Result interface{} - type ResultEcho struct { - Value string + Value string `json:"value"` +} + +type ResultEchoInt struct { + Value int `json:"value"` } type ResultEchoBytes struct { - Value []byte + Value []byte `json:"value"` } -var _ = wire.RegisterInterface( - struct{ Result }{}, - wire.ConcreteType{&ResultEcho{}, 0x1}, - wire.ConcreteType{&ResultEchoBytes{}, 0x2}, -) +type ResultEchoDataBytes struct { + Value data.Bytes `json:"value"` +} // Define some routes var Routes = map[string]*server.RPCFunc{ - "echo": server.NewRPCFunc(EchoResult, "arg"), - "echo_ws": server.NewWSRPCFunc(EchoWSResult, "arg"), - "echo_bytes": server.NewRPCFunc(EchoBytesResult, "arg"), + "echo": server.NewRPCFunc(EchoResult, "arg"), + "echo_ws": server.NewWSRPCFunc(EchoWSResult, "arg"), + "echo_bytes": server.NewRPCFunc(EchoBytesResult, "arg"), + "echo_data_bytes": server.NewRPCFunc(EchoDataBytesResult, "arg"), + "echo_int": server.NewRPCFunc(EchoIntResult, "arg"), } -func EchoResult(v string) (Result, error) { +func EchoResult(v string) (*ResultEcho, error) { return &ResultEcho{v}, nil } -func EchoWSResult(wsCtx types.WSRPCContext, v string) (Result, error) { +func EchoWSResult(wsCtx types.WSRPCContext, v string) (*ResultEcho, error) { return &ResultEcho{v}, nil } -func EchoBytesResult(v []byte) (Result, error) { +func EchoIntResult(v int) (*ResultEchoInt, error) { + return &ResultEchoInt{v}, nil +} + +func EchoBytesResult(v []byte) (*ResultEchoBytes, error) { return &ResultEchoBytes{v}, nil } +func EchoDataBytesResult(v data.Bytes) (*ResultEchoDataBytes, error) { + return &ResultEchoDataBytes{v}, nil +} + // launch unix and tcp servers func init() { cmd := exec.Command("rm", "-f", unixSocket) @@ -105,22 +115,44 @@ func echoViaHTTP(cl client.HTTPClient, val string) (string, error) { params := map[string]interface{}{ "arg": val, } - var result Result - if _, err := cl.Call("echo", params, &result); err != nil { + result := new(ResultEcho) + if _, err := cl.Call("echo", params, result); err != nil { return "", err } - return result.(*ResultEcho).Value, nil + return result.Value, nil +} + +func echoIntViaHTTP(cl client.HTTPClient, val int) (int, error) { + params := map[string]interface{}{ + "arg": val, + } + result := new(ResultEchoInt) + if _, err := cl.Call("echo_int", params, result); err != nil { + return 0, err + } + return result.Value, nil } func echoBytesViaHTTP(cl client.HTTPClient, bytes []byte) ([]byte, error) { params := map[string]interface{}{ "arg": bytes, } - var result Result - if _, err := cl.Call("echo_bytes", params, &result); err != nil { + result := new(ResultEchoBytes) + if _, err := cl.Call("echo_bytes", params, result); err != nil { + return []byte{}, err + } + return result.Value, nil +} + +func echoDataBytesViaHTTP(cl client.HTTPClient, bytes data.Bytes) (data.Bytes, error) { + params := map[string]interface{}{ + "arg": bytes, + } + result := new(ResultEchoDataBytes) + if _, err := cl.Call("echo_data_bytes", params, result); err != nil { return []byte{}, err } - return result.(*ResultEchoBytes).Value, nil + return result.Value, nil } func testWithHTTPClient(t *testing.T, cl client.HTTPClient) { @@ -133,6 +165,16 @@ func testWithHTTPClient(t *testing.T, cl client.HTTPClient) { got2, err := echoBytesViaHTTP(cl, val2) require.Nil(t, err) assert.Equal(t, got2, val2) + + val3 := data.Bytes(randBytes(t)) + got3, err := echoDataBytesViaHTTP(cl, val3) + require.Nil(t, err) + assert.Equal(t, got3, val3) + + val4 := rand.Intn(10000) + got4, err := echoIntViaHTTP(cl, val4) + require.Nil(t, err) + assert.Equal(t, got4, val4) } func echoViaWS(cl *client.WSClient, val string) (string, error) { @@ -146,12 +188,12 @@ func echoViaWS(cl *client.WSClient, val string) (string, error) { select { case msg := <-cl.ResultsCh: - result := new(Result) - wire.ReadJSONPtr(result, msg, &err) + result := new(ResultEcho) + err = json.Unmarshal(msg, result) if err != nil { return "", nil } - return (*result).(*ResultEcho).Value, nil + return result.Value, nil case err := <-cl.ErrorsCh: return "", err } @@ -168,12 +210,12 @@ func echoBytesViaWS(cl *client.WSClient, bytes []byte) ([]byte, error) { select { case msg := <-cl.ResultsCh: - result := new(Result) - wire.ReadJSONPtr(result, msg, &err) + result := new(ResultEchoBytes) + err = json.Unmarshal(msg, result) if err != nil { return []byte{}, nil } - return (*result).(*ResultEchoBytes).Value, nil + return result.Value, nil case err := <-cl.ErrorsCh: return []byte{}, err } @@ -241,20 +283,15 @@ func TestWSNewWSRPCFunc(t *testing.T) { params := map[string]interface{}{ "arg": val, } - err = cl.WriteJSON(types.RPCRequest{ - JSONRPC: "2.0", - ID: "", - Method: "echo_ws", - Params: params, - }) + err = cl.Call("echo_ws", params) require.Nil(t, err) select { case msg := <-cl.ResultsCh: - result := new(Result) - wire.ReadJSONPtr(result, msg, &err) + result := new(ResultEcho) + err = json.Unmarshal(msg, result) require.Nil(t, err) - got := (*result).(*ResultEcho).Value + got := result.Value assert.Equal(t, got, val) case err := <-cl.ErrorsCh: t.Fatal(err) @@ -279,10 +316,10 @@ func TestWSHandlesArrayParams(t *testing.T) { select { case msg := <-cl.ResultsCh: - result := new(Result) - wire.ReadJSONPtr(result, msg, &err) + result := new(ResultEcho) + err = json.Unmarshal(msg, result) require.Nil(t, err) - got := (*result).(*ResultEcho).Value + got := result.Value assert.Equal(t, got, val) case err := <-cl.ErrorsCh: t.Fatalf("%+v", err) diff --git a/rpc/lib/server/handlers.go b/rpc/lib/server/handlers.go index d7dffd402..51eaf1401 100644 --- a/rpc/lib/server/handlers.go +++ b/rpc/lib/server/handlers.go @@ -14,7 +14,7 @@ import ( "github.com/gorilla/websocket" "github.com/pkg/errors" - wire "github.com/tendermint/go-wire" + //wire "github.com/tendermint/go-wire" types "github.com/tendermint/tendermint/rpc/lib/types" cmn "github.com/tendermint/tmlibs/common" events "github.com/tendermint/tmlibs/events" @@ -204,7 +204,7 @@ func jsonParamsToArgsWS(rpcFunc *RPCFunc, paramsI interface{}, wsCtx types.WSRPC func _jsonObjectToArg(ty reflect.Type, object interface{}) (reflect.Value, error) { var err error v := reflect.New(ty) - wire.ReadJSONObjectPtr(v.Interface(), object, &err) + readJSONObjectPtr(v.Interface(), object, &err) if err != nil { return v, err } @@ -280,9 +280,8 @@ func httpParamsToArgs(rpcFunc *RPCFunc, r *http.Request) ([]reflect.Value, error } func _jsonStringToArg(ty reflect.Type, arg string) (reflect.Value, error) { - var err error v := reflect.New(ty) - wire.ReadJSONPtr(v.Interface(), []byte(arg), &err) + err := json.Unmarshal([]byte(arg), v.Interface()) if err != nil { return v, err } @@ -315,9 +314,8 @@ func nonJsonToArg(ty reflect.Type, arg string) (reflect.Value, error, bool) { } if isQuotedString && expectingByteSlice { - var err error v := reflect.New(reflect.TypeOf("")) - wire.ReadJSONPtr(v.Interface(), []byte(arg), &err) + err := json.Unmarshal([]byte(arg), v.Interface()) if err != nil { return reflect.ValueOf(nil), err, false } diff --git a/rpc/lib/server/parse_test.go b/rpc/lib/server/parse_test.go new file mode 100644 index 000000000..5b0478b64 --- /dev/null +++ b/rpc/lib/server/parse_test.go @@ -0,0 +1,133 @@ +package rpcserver + +import ( + "encoding/json" + "testing" + + "github.com/stretchr/testify/assert" + "github.com/tendermint/go-wire/data" +) + +func TestParseJSONMap(t *testing.T) { + assert := assert.New(t) + + input := []byte(`{"value":"1234","height":22}`) + + // naive is float,string + var p1 map[string]interface{} + err := json.Unmarshal(input, &p1) + if assert.Nil(err) { + h, ok := p1["height"].(float64) + if assert.True(ok, "%#v", p1["height"]) { + assert.EqualValues(22, h) + } + v, ok := p1["value"].(string) + if assert.True(ok, "%#v", p1["value"]) { + assert.EqualValues("1234", v) + } + } + + // preloading map with values doesn't help + tmp := 0 + p2 := map[string]interface{}{ + "value": &data.Bytes{}, + "height": &tmp, + } + err = json.Unmarshal(input, &p2) + if assert.Nil(err) { + h, ok := p2["height"].(float64) + if assert.True(ok, "%#v", p2["height"]) { + assert.EqualValues(22, h) + } + v, ok := p2["value"].(string) + if assert.True(ok, "%#v", p2["value"]) { + assert.EqualValues("1234", v) + } + } + + // preload here with *pointers* to the desired types + // struct has unknown types, but hard-coded keys + tmp = 0 + p3 := struct { + Value interface{} `json:"value"` + Height interface{} `json:"height"` + }{ + Height: &tmp, + Value: &data.Bytes{}, + } + err = json.Unmarshal(input, &p3) + if assert.Nil(err) { + h, ok := p3.Height.(*int) + if assert.True(ok, "%#v", p3.Height) { + assert.Equal(22, *h) + } + v, ok := p3.Value.(*data.Bytes) + if assert.True(ok, "%#v", p3.Value) { + assert.EqualValues([]byte{0x12, 0x34}, *v) + } + } + + // simplest solution, but hard-coded + p4 := struct { + Value data.Bytes `json:"value"` + Height int `json:"height"` + }{} + err = json.Unmarshal(input, &p4) + if assert.Nil(err) { + assert.EqualValues(22, p4.Height) + assert.EqualValues([]byte{0x12, 0x34}, p4.Value) + } + + // so, let's use this trick... + // dynamic keys on map, and we can deserialize to the desired types + var p5 map[string]*json.RawMessage + err = json.Unmarshal(input, &p5) + if assert.Nil(err) { + var h int + err = json.Unmarshal(*p5["height"], &h) + if assert.Nil(err) { + assert.Equal(22, h) + } + + var v data.Bytes + err = json.Unmarshal(*p5["value"], &v) + if assert.Nil(err) { + assert.Equal(data.Bytes{0x12, 0x34}, v) + } + } +} + +func TestParseJSONArray(t *testing.T) { + assert := assert.New(t) + + input := []byte(`["1234",22]`) + + // naive is float,string + var p1 []interface{} + err := json.Unmarshal(input, &p1) + if assert.Nil(err) { + v, ok := p1[0].(string) + if assert.True(ok, "%#v", p1[0]) { + assert.EqualValues("1234", v) + } + h, ok := p1[1].(float64) + if assert.True(ok, "%#v", p1[1]) { + assert.EqualValues(22, h) + } + } + + // preloading map with values helps here (unlike map - p2 above) + tmp := 0 + p2 := []interface{}{&data.Bytes{}, &tmp} + err = json.Unmarshal(input, &p2) + if assert.Nil(err) { + v, ok := p2[0].(*data.Bytes) + if assert.True(ok, "%#v", p2[0]) { + assert.EqualValues([]byte{0x12, 0x34}, *v) + } + h, ok := p2[1].(*int) + if assert.True(ok, "%#v", p2[1]) { + assert.EqualValues(22, *h) + } + } +} diff --git a/rpc/lib/server/wire.go b/rpc/lib/server/wire.go new file mode 100644 index 000000000..ad3d61f8c --- /dev/null +++ b/rpc/lib/server/wire.go @@ -0,0 +1,290 @@ +package rpcserver + +import ( + "encoding/base64" + "encoding/hex" + "reflect" + "time" + + "github.com/pkg/errors" + + "github.com/tendermint/go-wire" + "github.com/tendermint/go-wire/data" + cmn "github.com/tendermint/tmlibs/common" +) + +var ( + timeType = wire.GetTypeFromStructDeclaration(struct{ time.Time }{}) +) + +func readJSONObjectPtr(o interface{}, object interface{}, err *error) interface{} { + rv, rt := reflect.ValueOf(o), reflect.TypeOf(o) + if rv.Kind() == reflect.Ptr { + readReflectJSON(rv.Elem(), rt.Elem(), wire.Options{}, object, err) + } else { + cmn.PanicSanity("ReadJSON(Object)Ptr expects o to be a pointer") + } + return o +} + +func readByteJSON(o interface{}) (typeByte byte, rest interface{}, err error) { + oSlice, ok := o.([]interface{}) + if !ok { + err = errors.New(cmn.Fmt("Expected type [Byte,?] but got type %v", reflect.TypeOf(o))) + return + } + if len(oSlice) != 2 { + err = errors.New(cmn.Fmt("Expected [Byte,?] len 2 but got len %v", len(oSlice))) + return + } + typeByte_, ok := oSlice[0].(float64) + typeByte = byte(typeByte_) + rest = oSlice[1] + return +} + +// Contract: Caller must ensure that rt is supported +// (e.g. is recursively composed of supported native types, and structs and slices.) +// rv and rt refer to the object we're unmarhsaling into, whereas o is the result of naiive json unmarshal (map[string]interface{}) +func readReflectJSON(rv reflect.Value, rt reflect.Type, opts wire.Options, o interface{}, err *error) { + + // Get typeInfo + typeInfo := wire.GetTypeInfo(rt) + + if rt.Kind() == reflect.Interface { + if !typeInfo.IsRegisteredInterface { + // There's no way we can read such a thing. + *err = errors.New(cmn.Fmt("Cannot read unregistered interface type %v", rt)) + return + } + if o == nil { + return // nil + } + typeByte, rest, err_ := readByteJSON(o) + if err_ != nil { + *err = err_ + return + } + crt, ok := typeInfo.ByteToType[typeByte] + if !ok { + *err = errors.New(cmn.Fmt("Byte %X not registered for interface %v", typeByte, rt)) + return + } + if crt.Kind() == reflect.Ptr { + crt = crt.Elem() + crv := reflect.New(crt) + readReflectJSON(crv.Elem(), crt, opts, rest, err) + rv.Set(crv) // NOTE: orig rv is ignored. + } else { + crv := reflect.New(crt).Elem() + readReflectJSON(crv, crt, opts, rest, err) + rv.Set(crv) // NOTE: orig rv is ignored. + } + return + } + + if rt.Kind() == reflect.Ptr { + if o == nil { + return // nil + } + // Create new struct if rv is nil. + if rv.IsNil() { + newRv := reflect.New(rt.Elem()) + rv.Set(newRv) + rv = newRv + } + // Dereference pointer + rv, rt = rv.Elem(), rt.Elem() + typeInfo = wire.GetTypeInfo(rt) + // continue... + } + + switch rt.Kind() { + case reflect.Array: + elemRt := rt.Elem() + length := rt.Len() + if elemRt.Kind() == reflect.Uint8 { + // Special case: Bytearrays + oString, ok := o.(string) + if !ok { + *err = errors.New(cmn.Fmt("Expected string but got type %v", reflect.TypeOf(o))) + return + } + + // if its data.Bytes, use hex; else use base64 + dbty := reflect.TypeOf(data.Bytes{}) + var buf []byte + var err_ error + if rt == dbty { + buf, err_ = hex.DecodeString(oString) + } else { + buf, err_ = base64.StdEncoding.DecodeString(oString) + } + if err_ != nil { + *err = err_ + return + } + if len(buf) != length { + *err = errors.New(cmn.Fmt("Expected bytearray of length %v but got %v", length, len(buf))) + return + } + //log.Info("Read bytearray", "bytes", buf) + reflect.Copy(rv, reflect.ValueOf(buf)) + } else { + oSlice, ok := o.([]interface{}) + if !ok { + *err = errors.New(cmn.Fmt("Expected array of %v but got type %v", rt, reflect.TypeOf(o))) + return + } + if len(oSlice) != length { + *err = errors.New(cmn.Fmt("Expected array of length %v but got %v", length, len(oSlice))) + return + } + for i := 0; i < length; i++ { + elemRv := rv.Index(i) + readReflectJSON(elemRv, elemRt, opts, oSlice[i], err) + } + //log.Info("Read x-array", "x", elemRt, "length", length) + } + + case reflect.Slice: + elemRt := rt.Elem() + if elemRt.Kind() == reflect.Uint8 { + // Special case: Byteslices + oString, ok := o.(string) + if !ok { + *err = errors.New(cmn.Fmt("Expected string but got type %v", reflect.TypeOf(o))) + return + } + // if its data.Bytes, use hex; else use base64 + dbty := reflect.TypeOf(data.Bytes{}) + var buf []byte + var err_ error + if rt == dbty { + buf, err_ = hex.DecodeString(oString) + } else { + buf, err_ = base64.StdEncoding.DecodeString(oString) + } + if err_ != nil { + *err = err_ + return + } + //log.Info("Read byteslice", "bytes", byteslice) + rv.Set(reflect.ValueOf(buf)) + } else { + // Read length + oSlice, ok := o.([]interface{}) + if !ok { + *err = errors.New(cmn.Fmt("Expected array of %v but got type %v", rt, reflect.TypeOf(o))) + return + } + length := len(oSlice) + //log.Info("Read slice", "length", length) + sliceRv := reflect.MakeSlice(rt, length, length) + // Read elems + for i := 0; i < length; i++ { + elemRv := sliceRv.Index(i) + readReflectJSON(elemRv, elemRt, opts, oSlice[i], err) + } + rv.Set(sliceRv) + } + + case reflect.Struct: + if rt == timeType { + // Special case: time.Time + str, ok := o.(string) + if !ok { + *err = errors.New(cmn.Fmt("Expected string but got type %v", reflect.TypeOf(o))) + return + } + // try three ways, seconds, milliseconds, or microseconds... + t, err_ := time.Parse(time.RFC3339Nano, str) + if err_ != nil { + *err = err_ + return + } + rv.Set(reflect.ValueOf(t)) + } else { + if typeInfo.Unwrap { + f := typeInfo.Fields[0] + fieldIdx, fieldType, opts := f.Index, f.Type, f.Options + fieldRv := rv.Field(fieldIdx) + readReflectJSON(fieldRv, fieldType, opts, o, err) + } else { + oMap, ok := o.(map[string]interface{}) + if !ok { + *err = errors.New(cmn.Fmt("Expected map but got type %v", reflect.TypeOf(o))) + return + } + // TODO: ensure that all fields are set? + // TODO: disallow unknown oMap fields? + for _, fieldInfo := range typeInfo.Fields { + f := fieldInfo + fieldIdx, fieldType, opts := f.Index, f.Type, f.Options + value, ok := oMap[opts.JSONName] + if !ok { + continue // Skip missing fields. + } + fieldRv := rv.Field(fieldIdx) + readReflectJSON(fieldRv, fieldType, opts, value, err) + } + } + } + + case reflect.String: + str, ok := o.(string) + if !ok { + *err = errors.New(cmn.Fmt("Expected string but got type %v", reflect.TypeOf(o))) + return + } + //log.Info("Read string", "str", str) + rv.SetString(str) + + case reflect.Int64, reflect.Int32, reflect.Int16, reflect.Int8, reflect.Int: + num, ok := o.(float64) + if !ok { + *err = errors.New(cmn.Fmt("Expected numeric but got type %v", reflect.TypeOf(o))) + return + } + //log.Info("Read num", "num", num) + rv.SetInt(int64(num)) + + case reflect.Uint64, reflect.Uint32, reflect.Uint16, reflect.Uint8, reflect.Uint: + num, ok := o.(float64) + if !ok { + *err = errors.New(cmn.Fmt("Expected numeric but got type %v", reflect.TypeOf(o))) + return + } + if num < 0 { + *err = errors.New(cmn.Fmt("Expected unsigned numeric but got %v", num)) + return + } + //log.Info("Read num", "num", num) + rv.SetUint(uint64(num)) + + case reflect.Float64, reflect.Float32: + if !opts.Unsafe { + *err = errors.New("Wire float* support requires `wire:\"unsafe\"`") + return + } + num, ok := o.(float64) + if !ok { + *err = errors.New(cmn.Fmt("Expected numeric but got type %v", reflect.TypeOf(o))) + return + } + //log.Info("Read num", "num", num) + rv.SetFloat(num) + + case reflect.Bool: + bl, ok := o.(bool) + if !ok { + *err = errors.New(cmn.Fmt("Expected boolean but got type %v", reflect.TypeOf(o))) + return + } + //log.Info("Read boolean", "boolean", bl) + rv.SetBool(bl) + + default: + cmn.PanicSanity(cmn.Fmt("Unknown field type %v", rt.Kind())) + } +} diff --git a/rpc/lib/types/types.go b/rpc/lib/types/types.go index 9c5f2625b..28ddfc0ff 100644 --- a/rpc/lib/types/types.go +++ b/rpc/lib/types/types.go @@ -4,7 +4,6 @@ import ( "encoding/json" "strings" - wire "github.com/tendermint/go-wire" events "github.com/tendermint/tmlibs/events" ) @@ -52,8 +51,14 @@ type RPCResponse struct { func NewRPCResponse(id string, res interface{}, err string) RPCResponse { var raw *json.RawMessage if res != nil { - rawMsg := json.RawMessage(wire.JSONBytes(res)) - raw = &rawMsg + var js []byte + js, err2 := json.Marshal(res) + if err2 == nil { + rawMsg := json.RawMessage(js) + raw = &rawMsg + } else { + err = err2.Error() + } } return RPCResponse{ JSONRPC: "2.0", diff --git a/rpc/test/client_test.go b/rpc/test/client_test.go index c9a922d72..aed179d47 100644 --- a/rpc/test/client_test.go +++ b/rpc/test/client_test.go @@ -12,6 +12,7 @@ import ( "github.com/stretchr/testify/require" abci "github.com/tendermint/abci/types" + "github.com/tendermint/go-wire/data" "github.com/tendermint/tendermint/rpc/core" ctypes "github.com/tendermint/tendermint/rpc/core/types" rpc "github.com/tendermint/tendermint/rpc/lib/client" @@ -38,12 +39,11 @@ func TestJSONStatus(t *testing.T) { func testStatus(t *testing.T, client rpc.HTTPClient) { chainID := GetConfig().GetString("chain_id") - tmResult := new(ctypes.TMResult) - _, err := client.Call("status", map[string]interface{}{}, tmResult) + result := new(ctypes.ResultStatus) + _, err := client.Call("status", map[string]interface{}{}, result) require.Nil(t, err) - status := tmResult.Unwrap().(*ctypes.ResultStatus) - assert.Equal(t, chainID, status.NodeInfo.Network) + assert.Equal(t, chainID, result.NodeInfo.Network) } //-------------------------------------------------------------------------------- @@ -69,13 +69,12 @@ func TestJSONBroadcastTxSync(t *testing.T) { func testBroadcastTxSync(t *testing.T, client rpc.HTTPClient) { mem := node.MempoolReactor().Mempool initMemSize := mem.Size() - tmResult := new(ctypes.TMResult) + result := new(ctypes.ResultBroadcastTx) tx := randBytes(t) - _, err := client.Call("broadcast_tx_sync", map[string]interface{}{"tx": tx}, tmResult) + _, err := client.Call("broadcast_tx_sync", map[string]interface{}{"tx": tx}, result) require.Nil(t, err) - res := tmResult.Unwrap().(*ctypes.ResultBroadcastTx) - require.Equal(t, abci.CodeType_OK, res.Code) + require.Equal(t, abci.CodeType_OK, result.Code) require.Equal(t, initMemSize+1, mem.Size()) txs := mem.Reap(1) require.EqualValues(t, tx, txs[0]) @@ -92,14 +91,13 @@ func testTxKV(t *testing.T) ([]byte, []byte, types.Tx) { } func sendTx(t *testing.T, client rpc.HTTPClient) ([]byte, []byte) { - tmResult := new(ctypes.TMResult) + result := new(ctypes.ResultBroadcastTxCommit) k, v, tx := testTxKV(t) - _, err := client.Call("broadcast_tx_commit", map[string]interface{}{"tx": tx}, tmResult) + _, err := client.Call("broadcast_tx_commit", map[string]interface{}{"tx": tx}, result) require.Nil(t, err) - bres := tmResult.Unwrap().(*ctypes.ResultBroadcastTxCommit) - require.NotNil(t, 0, bres.DeliverTx, "%#v", bres) - require.EqualValues(t, 0, bres.CheckTx.Code, "%#v", bres) - require.EqualValues(t, 0, bres.DeliverTx.Code, "%#v", bres) + require.NotNil(t, 0, result.DeliverTx, "%#v", result) + require.EqualValues(t, 0, result.CheckTx.Code, "%#v", result) + require.EqualValues(t, 0, result.DeliverTx.Code, "%#v", result) return k, v } @@ -114,16 +112,15 @@ func TestJSONABCIQuery(t *testing.T) { func testABCIQuery(t *testing.T, client rpc.HTTPClient) { k, _ := sendTx(t, client) time.Sleep(time.Millisecond * 500) - tmResult := new(ctypes.TMResult) + result := new(ctypes.ResultABCIQuery) _, err := client.Call("abci_query", - map[string]interface{}{"path": "", "data": k, "prove": false}, tmResult) + map[string]interface{}{"path": "", "data": data.Bytes(k), "prove": false}, result) require.Nil(t, err) - resQuery := tmResult.Unwrap().(*ctypes.ResultABCIQuery) - require.EqualValues(t, 0, resQuery.Code) + require.EqualValues(t, 0, result.Code) // XXX: specific to value returned by the dummy - require.NotEqual(t, 0, len(resQuery.Value)) + require.NotEqual(t, 0, len(result.Value)) } //-------------------------------------------------------------------------------- @@ -140,15 +137,14 @@ func TestJSONBroadcastTxCommit(t *testing.T) { func testBroadcastTxCommit(t *testing.T, client rpc.HTTPClient) { require := require.New(t) - tmResult := new(ctypes.TMResult) + result := new(ctypes.ResultBroadcastTxCommit) tx := randBytes(t) - _, err := client.Call("broadcast_tx_commit", map[string]interface{}{"tx": tx}, tmResult) + _, err := client.Call("broadcast_tx_commit", map[string]interface{}{"tx": tx}, result) require.Nil(err) - res := tmResult.Unwrap().(*ctypes.ResultBroadcastTxCommit) - checkTx := res.CheckTx + checkTx := result.CheckTx require.Equal(abci.CodeType_OK, checkTx.Code) - deliverTx := res.DeliverTx + deliverTx := result.DeliverTx require.Equal(abci.CodeType_OK, deliverTx.Code) mem := node.MempoolReactor().Mempool require.Equal(0, mem.Size()) @@ -178,16 +174,15 @@ func testTx(t *testing.T, client rpc.HTTPClient, withIndexer bool) { assert, require := assert.New(t), require.New(t) // first we broadcast a tx - tmResult := new(ctypes.TMResult) + result := new(ctypes.ResultBroadcastTxCommit) txBytes := randBytes(t) tx := types.Tx(txBytes) - _, err := client.Call("broadcast_tx_commit", map[string]interface{}{"tx": txBytes}, tmResult) + _, err := client.Call("broadcast_tx_commit", map[string]interface{}{"tx": txBytes}, result) require.Nil(err) - res := tmResult.Unwrap().(*ctypes.ResultBroadcastTxCommit) - checkTx := res.CheckTx + checkTx := result.CheckTx require.Equal(abci.CodeType_OK, checkTx.Code) - deliverTx := res.DeliverTx + deliverTx := result.DeliverTx require.Equal(abci.CodeType_OK, deliverTx.Code) mem := node.MempoolReactor().Mempool require.Equal(0, mem.Size()) @@ -214,24 +209,23 @@ func testTx(t *testing.T, client rpc.HTTPClient, withIndexer bool) { // now we query for the tx. // since there's only one tx, we know index=0. - tmResult = new(ctypes.TMResult) + result2 := new(ctypes.ResultTx) query := map[string]interface{}{ "hash": tc.hash, "prove": tc.prove, } - _, err = client.Call("tx", query, tmResult) + _, err = client.Call("tx", query, result2) valid := (withIndexer && tc.valid) if !valid { require.NotNil(err, idx) } else { require.Nil(err, idx) - res2 := tmResult.Unwrap().(*ctypes.ResultTx) - assert.Equal(tx, res2.Tx, idx) - assert.Equal(res.Height, res2.Height, idx) - assert.Equal(0, res2.Index, idx) - assert.Equal(abci.CodeType_OK, res2.TxResult.Code, idx) + assert.Equal(tx, result2.Tx, idx) + assert.Equal(result.Height, result2.Height, idx) + assert.Equal(0, result2.Index, idx) + assert.Equal(abci.CodeType_OK, result2.TxResult.Code, idx) // time to verify the proof - proof := res2.Proof + proof := result2.Proof if tc.prove && assert.Equal(tx, proof.Data, idx) { assert.True(proof.Proof.Verify(proof.Index, proof.Total, tx.Hash(), proof.RootHash), idx) } @@ -286,7 +280,7 @@ func TestWSBlockchainGrowth(t *testing.T) { var initBlockN int for i := 0; i < 3; i++ { waitForEvent(t, wsc, eid, true, func() {}, func(eid string, eventData interface{}) error { - block := eventData.(types.EventDataNewBlock).Block + block := eventData.(types.TMEventData).Unwrap().(types.EventDataNewBlock).Block if i == 0 { initBlockN = block.Header.Height } else { @@ -315,12 +309,12 @@ func TestWSTxEvent(t *testing.T) { }() // send an tx - tmResult := new(ctypes.TMResult) - _, err := GetJSONClient().Call("broadcast_tx_sync", map[string]interface{}{"tx": tx}, tmResult) + result := new(ctypes.ResultBroadcastTx) + _, err := GetJSONClient().Call("broadcast_tx_sync", map[string]interface{}{"tx": tx}, result) require.Nil(err) waitForEvent(t, wsc, eid, true, func() {}, func(eid string, b interface{}) error { - evt, ok := b.(types.EventDataTx) + evt, ok := b.(types.TMEventData).Unwrap().(types.EventDataTx) require.True(ok, "Got wrong event type: %#v", b) require.Equal(tx, []byte(evt.Tx), "Returned different tx") require.Equal(abci.CodeType_OK, evt.Code) @@ -373,12 +367,12 @@ func TestWSDoubleFire(t *testing.T) { // //func TestURIUnsafeSetConfig(t *testing.T) { // for _, testCase := range testCasesUnsafeSetConfig { -// tmResult := new(ctypes.TMResult) +// result := new(ctypes.TMResult) // _, err := GetURIClient().Call("unsafe_set_config", map[string]interface{}{ // "type": testCase[0], // "key": testCase[1], // "value": testCase[2], -// }, tmResult) +// }, result) // require.Nil(t, err) // } // testUnsafeSetConfig(t) @@ -386,10 +380,10 @@ func TestWSDoubleFire(t *testing.T) { // //func TestJSONUnsafeSetConfig(t *testing.T) { // for _, testCase := range testCasesUnsafeSetConfig { -// tmResult := new(ctypes.TMResult) +// result := new(ctypes.TMResult) // _, err := GetJSONClient().Call("unsafe_set_config", // map[string]interface{}{"type": testCase[0], "key": testCase[1], "value": testCase[2]}, -// tmResult) +// result) // require.Nil(t, err) // } // testUnsafeSetConfig(t) diff --git a/rpc/test/helpers.go b/rpc/test/helpers.go index 622890596..a12d82f23 100644 --- a/rpc/test/helpers.go +++ b/rpc/test/helpers.go @@ -1,6 +1,7 @@ package rpctest import ( + "encoding/json" "fmt" "math/rand" "os" @@ -11,7 +12,6 @@ import ( "github.com/spf13/viper" "github.com/stretchr/testify/require" - wire "github.com/tendermint/go-wire" logger "github.com/tendermint/tmlibs/logger" abci "github.com/tendermint/abci/types" @@ -131,15 +131,16 @@ func waitForEvent(t *testing.T, wsc *client.WSClient, eventid string, dieOnTimeo for { select { case r := <-wsc.ResultsCh: - result := new(ctypes.TMResult) - wire.ReadJSONPtr(result, r, &err) + fmt.Println("GOT IT", string(r)) + result := new(ctypes.ResultEvent) + err = json.Unmarshal(r, result) if err != nil { - errCh <- err - break LOOP + fmt.Println("POOP", err) + // cant distinguish between error and wrong type ... + continue } - event, ok := result.Unwrap().(*ctypes.ResultEvent) - if ok && event.Name == eventid { - goodCh <- event.Data + if result.Name == eventid { + goodCh <- result.Data break LOOP } case err := <-wsc.ErrorsCh: diff --git a/test/app/counter_test.sh b/test/app/counter_test.sh index 439926a5d..cc5c38b25 100644 --- a/test/app/counter_test.sh +++ b/test/app/counter_test.sh @@ -33,7 +33,7 @@ function sendTx() { ERROR=`echo $RESPONSE | jq .error` ERROR=$(echo "$ERROR" | tr -d '"') # remove surrounding quotes - RESPONSE=`echo $RESPONSE | jq .result[1]` + RESPONSE=`echo $RESPONSE | jq .result` else if [ -f grpc_client ]; then rm grpc_client diff --git a/test/app/dummy_test.sh b/test/app/dummy_test.sh index 0449bc491..1a117c634 100644 --- a/test/app/dummy_test.sh +++ b/test/app/dummy_test.sh @@ -57,7 +57,7 @@ echo "... testing query with /abci_query 2" # we should be able to look up the key RESPONSE=`curl -s "127.0.0.1:46657/abci_query?path=\"\"&data=$(toHex $KEY)&prove=false"` -RESPONSE=`echo $RESPONSE | jq .result[1].response.log` +RESPONSE=`echo $RESPONSE | jq .result.response.log` set +e A=`echo $RESPONSE | grep 'exists'` @@ -70,7 +70,7 @@ set -e # we should not be able to look up the value RESPONSE=`curl -s "127.0.0.1:46657/abci_query?path=\"\"&data=$(toHex $VALUE)&prove=false"` -RESPONSE=`echo $RESPONSE | jq .result[1].response.log` +RESPONSE=`echo $RESPONSE | jq .result.response.log` set +e A=`echo $RESPONSE | grep 'exists'` if [[ $? == 0 ]]; then diff --git a/test/p2p/atomic_broadcast/test.sh b/test/p2p/atomic_broadcast/test.sh index 8e0633c8a..00b339631 100644 --- a/test/p2p/atomic_broadcast/test.sh +++ b/test/p2p/atomic_broadcast/test.sh @@ -17,7 +17,7 @@ for i in `seq 1 $N`; do addr=$(test/p2p/ip.sh $i):46657 # current state - HASH1=`curl -s $addr/status | jq .result[1].latest_app_hash` + HASH1=`curl -s $addr/status | jq .result.latest_app_hash` # - send a tx TX=aadeadbeefbeefbeef0$i @@ -26,15 +26,15 @@ for i in `seq 1 $N`; do echo "" # we need to wait another block to get the new app_hash - h1=`curl -s $addr/status | jq .result[1].latest_block_height` + h1=`curl -s $addr/status | jq .result.latest_block_height` h2=$h1 while [ "$h2" == "$h1" ]; do sleep 1 - h2=`curl -s $addr/status | jq .result[1].latest_block_height` + h2=`curl -s $addr/status | jq .result.latest_block_height` done # check that hash was updated - HASH2=`curl -s $addr/status | jq .result[1].latest_app_hash` + HASH2=`curl -s $addr/status | jq .result.latest_app_hash` if [[ "$HASH1" == "$HASH2" ]]; then echo "Expected state hash to update from $HASH1. Got $HASH2" exit 1 @@ -44,7 +44,7 @@ for i in `seq 1 $N`; do for j in `seq 1 $N`; do if [[ "$i" != "$j" ]]; then addrJ=$(test/p2p/ip.sh $j):46657 - HASH3=`curl -s $addrJ/status | jq .result[1].latest_app_hash` + HASH3=`curl -s $addrJ/status | jq .result.latest_app_hash` if [[ "$HASH2" != "$HASH3" ]]; then echo "App hash for node $j doesn't match. Got $HASH3, expected $HASH2" diff --git a/test/p2p/basic/test.sh b/test/p2p/basic/test.sh index 3399515a8..93444792b 100644 --- a/test/p2p/basic/test.sh +++ b/test/p2p/basic/test.sh @@ -31,19 +31,19 @@ for i in `seq 1 $N`; do N_1=$(($N - 1)) # - assert everyone has N-1 other peers - N_PEERS=`curl -s $addr/net_info | jq '.result[1].peers | length'` + N_PEERS=`curl -s $addr/net_info | jq '.result.peers | length'` while [ "$N_PEERS" != $N_1 ]; do echo "Waiting for node $i to connect to all peers ..." sleep 1 - N_PEERS=`curl -s $addr/net_info | jq '.result[1].peers | length'` + N_PEERS=`curl -s $addr/net_info | jq '.result.peers | length'` done # - assert block height is greater than 1 - BLOCK_HEIGHT=`curl -s $addr/status | jq .result[1].latest_block_height` + BLOCK_HEIGHT=`curl -s $addr/status | jq .result.latest_block_height` while [ "$BLOCK_HEIGHT" -le 1 ]; do echo "Waiting for node $i to commit a block ..." sleep 1 - BLOCK_HEIGHT=`curl -s $addr/status | jq .result[1].latest_block_height` + BLOCK_HEIGHT=`curl -s $addr/status | jq .result.latest_block_height` done echo "Node $i is connected to all peers and at block $BLOCK_HEIGHT" done diff --git a/test/p2p/fast_sync/check_peer.sh b/test/p2p/fast_sync/check_peer.sh index c459277d2..b10c3efc5 100644 --- a/test/p2p/fast_sync/check_peer.sh +++ b/test/p2p/fast_sync/check_peer.sh @@ -15,10 +15,10 @@ peerID=$(( $(($ID % 4)) + 1 )) # 1->2 ... 3->4 ... 4->1 peer_addr=$(test/p2p/ip.sh $peerID):46657 # get another peer's height -h1=`curl -s $peer_addr/status | jq .result[1].latest_block_height` +h1=`curl -s $peer_addr/status | jq .result.latest_block_height` # get another peer's state -root1=`curl -s $peer_addr/status | jq .result[1].latest_app_hash` +root1=`curl -s $peer_addr/status | jq .result.latest_app_hash` echo "Other peer is on height $h1 with state $root1" echo "Waiting for peer $ID to catch up" @@ -29,12 +29,12 @@ set +o pipefail h2="0" while [[ "$h2" -lt "$(($h1+3))" ]]; do sleep 1 - h2=`curl -s $addr/status | jq .result[1].latest_block_height` + h2=`curl -s $addr/status | jq .result.latest_block_height` echo "... $h2" done # check the app hash -root2=`curl -s $addr/status | jq .result[1].latest_app_hash` +root2=`curl -s $addr/status | jq .result.latest_app_hash` if [[ "$root1" != "$root2" ]]; then echo "App hash after fast sync does not match. Got $root2; expected $root1" diff --git a/test/p2p/kill_all/check_peers.sh b/test/p2p/kill_all/check_peers.sh index d085a025c..52dcde91c 100644 --- a/test/p2p/kill_all/check_peers.sh +++ b/test/p2p/kill_all/check_peers.sh @@ -23,7 +23,7 @@ set -e # get the first peer's height addr=$(test/p2p/ip.sh 1):46657 -h1=$(curl -s "$addr/status" | jq .result[1].latest_block_height) +h1=$(curl -s "$addr/status" | jq .result.latest_block_height) echo "1st peer is on height $h1" echo "Waiting until other peers reporting a height higher than the 1st one" @@ -33,14 +33,14 @@ for i in $(seq 2 "$NUM_OF_PEERS"); do while [[ $hi -le $h1 ]] ; do addr=$(test/p2p/ip.sh "$i"):46657 - hi=$(curl -s "$addr/status" | jq .result[1].latest_block_height) + hi=$(curl -s "$addr/status" | jq .result.latest_block_height) echo "... peer $i is on height $hi" ((attempt++)) if [ "$attempt" -ge $MAX_ATTEMPTS_TO_CATCH_UP ] ; then echo "$attempt unsuccessful attempts were made to catch up" - curl -s "$addr/dump_consensus_state" | jq .result[1] + curl -s "$addr/dump_consensus_state" | jq .result exit 1 fi diff --git a/test/p2p/pex/check_peer.sh b/test/p2p/pex/check_peer.sh index ceabd2ac6..e851c8923 100644 --- a/test/p2p/pex/check_peer.sh +++ b/test/p2p/pex/check_peer.sh @@ -10,7 +10,7 @@ echo "2. wait until peer $ID connects to other nodes using pex reactor" peers_count="0" while [[ "$peers_count" -lt "$((N-1))" ]]; do sleep 1 - peers_count=$(curl -s "$addr/net_info" | jq ".result[1].peers | length") + peers_count=$(curl -s "$addr/net_info" | jq ".result.peers | length") echo "... peers count = $peers_count, expected = $((N-1))" done diff --git a/test/persist/test_failure_indices.sh b/test/persist/test_failure_indices.sh index bca8b8ae9..9e3c8f19e 100644 --- a/test/persist/test_failure_indices.sh +++ b/test/persist/test_failure_indices.sh @@ -107,11 +107,11 @@ for failIndex in $(seq $failsStart $failsEnd); do done # wait for a new block - h1=$(curl -s --unix-socket "$RPC_ADDR" http://localhost/status | jq .result[1].latest_block_height) + h1=$(curl -s --unix-socket "$RPC_ADDR" http://localhost/status | jq .result.latest_block_height) h2=$h1 while [ "$h2" == "$h1" ]; do sleep 1 - h2=$(curl -s --unix-socket "$RPC_ADDR" http://localhost/status | jq .result[1].latest_block_height) + h2=$(curl -s --unix-socket "$RPC_ADDR" http://localhost/status | jq .result.latest_block_height) done kill_procs diff --git a/test/persist/test_simple.sh b/test/persist/test_simple.sh index 59bc38458..273c714ca 100644 --- a/test/persist/test_simple.sh +++ b/test/persist/test_simple.sh @@ -57,11 +57,11 @@ while [ "$ERR" != 0 ]; do done # wait for a new block -h1=`curl -s $addr/status | jq .result[1].latest_block_height` +h1=`curl -s $addr/status | jq .result.latest_block_height` h2=$h1 while [ "$h2" == "$h1" ]; do sleep 1 - h2=`curl -s $addr/status | jq .result[1].latest_block_height` + h2=`curl -s $addr/status | jq .result.latest_block_height` done kill_procs diff --git a/types/events.go b/types/events.go index 953859d0d..8c29c4445 100644 --- a/types/events.go +++ b/types/events.go @@ -3,7 +3,6 @@ package types import ( // for registering TMEventData as events.EventData abci "github.com/tendermint/abci/types" - "github.com/tendermint/go-wire" "github.com/tendermint/go-wire/data" cmn "github.com/tendermint/tmlibs/common" "github.com/tendermint/tmlibs/events" @@ -34,10 +33,47 @@ func EventStringVote() string { return "Vote" } //---------------------------------------- +var ( + EventDataNameNewBlock = "new_block" + EventDataNameNewBlockHeader = "new_block_header" + EventDataNameTx = "tx" + EventDataNameRoundState = "round_state" + EventDataNameVote = "vote" +) + +//---------------------------------------- + // implements events.EventData -type TMEventData interface { +type TMEventDataInner interface { events.EventData - AssertIsTMEventData() +} + +type TMEventData struct { + TMEventDataInner `json:"unwrap"` +} + +func (tmr TMEventData) MarshalJSON() ([]byte, error) { + return tmEventDataMapper.ToJSON(tmr.TMEventDataInner) +} + +func (tmr *TMEventData) UnmarshalJSON(data []byte) (err error) { + parsed, err := tmEventDataMapper.FromJSON(data) + if err == nil && parsed != nil { + tmr.TMEventDataInner = parsed.(TMEventDataInner) + } + return +} + +func (tmr TMEventData) Unwrap() TMEventDataInner { + tmrI := tmr.TMEventDataInner + for wrap, ok := tmrI.(TMEventData); ok; wrap, ok = tmrI.(TMEventData) { + tmrI = wrap.TMEventDataInner + } + return tmrI +} + +func (tmr TMEventData) Empty() bool { + return tmr.TMEventDataInner == nil } const ( @@ -50,15 +86,12 @@ const ( EventDataTypeVote = byte(0x12) ) -var _ = wire.RegisterInterface( - struct{ TMEventData }{}, - wire.ConcreteType{EventDataNewBlock{}, EventDataTypeNewBlock}, - wire.ConcreteType{EventDataNewBlockHeader{}, EventDataTypeNewBlockHeader}, - // wire.ConcreteType{EventDataFork{}, EventDataTypeFork }, - wire.ConcreteType{EventDataTx{}, EventDataTypeTx}, - wire.ConcreteType{EventDataRoundState{}, EventDataTypeRoundState}, - wire.ConcreteType{EventDataVote{}, EventDataTypeVote}, -) +var tmEventDataMapper = data.NewMapper(TMEventData{}). + RegisterImplementation(EventDataNewBlock{}, EventDataNameNewBlock, EventDataTypeNewBlock). + RegisterImplementation(EventDataNewBlockHeader{}, EventDataNameNewBlockHeader, EventDataTypeNewBlockHeader). + RegisterImplementation(EventDataTx{}, EventDataNameTx, EventDataTypeTx). + RegisterImplementation(EventDataRoundState{}, EventDataNameRoundState, EventDataTypeRoundState). + RegisterImplementation(EventDataVote{}, EventDataNameVote, EventDataTypeVote) // Most event messages are basic types (a block, a transaction) // but some (an input to a call tx or a receive) are more exotic @@ -147,55 +180,55 @@ func AddListenerForEvent(evsw EventSwitch, id, event string, cb func(data TMEven //--- block, tx, and vote events func FireEventNewBlock(fireable events.Fireable, block EventDataNewBlock) { - fireEvent(fireable, EventStringNewBlock(), block) + fireEvent(fireable, EventStringNewBlock(), TMEventData{block}) } func FireEventNewBlockHeader(fireable events.Fireable, header EventDataNewBlockHeader) { - fireEvent(fireable, EventStringNewBlockHeader(), header) + fireEvent(fireable, EventStringNewBlockHeader(), TMEventData{header}) } func FireEventVote(fireable events.Fireable, vote EventDataVote) { - fireEvent(fireable, EventStringVote(), vote) + fireEvent(fireable, EventStringVote(), TMEventData{vote}) } func FireEventTx(fireable events.Fireable, tx EventDataTx) { - fireEvent(fireable, EventStringTx(tx.Tx), tx) + fireEvent(fireable, EventStringTx(tx.Tx), TMEventData{tx}) } //--- EventDataRoundState events func FireEventNewRoundStep(fireable events.Fireable, rs EventDataRoundState) { - fireEvent(fireable, EventStringNewRoundStep(), rs) + fireEvent(fireable, EventStringNewRoundStep(), TMEventData{rs}) } func FireEventTimeoutPropose(fireable events.Fireable, rs EventDataRoundState) { - fireEvent(fireable, EventStringTimeoutPropose(), rs) + fireEvent(fireable, EventStringTimeoutPropose(), TMEventData{rs}) } func FireEventTimeoutWait(fireable events.Fireable, rs EventDataRoundState) { - fireEvent(fireable, EventStringTimeoutWait(), rs) + fireEvent(fireable, EventStringTimeoutWait(), TMEventData{rs}) } func FireEventNewRound(fireable events.Fireable, rs EventDataRoundState) { - fireEvent(fireable, EventStringNewRound(), rs) + fireEvent(fireable, EventStringNewRound(), TMEventData{rs}) } func FireEventCompleteProposal(fireable events.Fireable, rs EventDataRoundState) { - fireEvent(fireable, EventStringCompleteProposal(), rs) + fireEvent(fireable, EventStringCompleteProposal(), TMEventData{rs}) } func FireEventPolka(fireable events.Fireable, rs EventDataRoundState) { - fireEvent(fireable, EventStringPolka(), rs) + fireEvent(fireable, EventStringPolka(), TMEventData{rs}) } func FireEventUnlock(fireable events.Fireable, rs EventDataRoundState) { - fireEvent(fireable, EventStringUnlock(), rs) + fireEvent(fireable, EventStringUnlock(), TMEventData{rs}) } func FireEventRelock(fireable events.Fireable, rs EventDataRoundState) { - fireEvent(fireable, EventStringRelock(), rs) + fireEvent(fireable, EventStringRelock(), TMEventData{rs}) } func FireEventLock(fireable events.Fireable, rs EventDataRoundState) { - fireEvent(fireable, EventStringLock(), rs) + fireEvent(fireable, EventStringLock(), TMEventData{rs}) }