Browse Source

Merge pull request #467 from tendermint/nowire

Nowire
pull/465/head
Ethan Buchman 8 years ago
committed by GitHub
parent
commit
297772e009
32 changed files with 762 additions and 461 deletions
  1. +1
    -1
      consensus/common_test.go
  2. +1
    -1
      consensus/mempool_test.go
  3. +4
    -4
      consensus/reactor.go
  4. +3
    -3
      consensus/reactor_test.go
  5. +15
    -15
      consensus/state_test.go
  6. +2
    -2
      rpc/client/event_test.go
  7. +3
    -3
      rpc/client/helpers.go
  8. +44
    -48
      rpc/client/httpclient.go
  9. +2
    -2
      rpc/core/events.go
  10. +1
    -1
      rpc/core/mempool.go
  11. +23
    -150
      rpc/core/routes.go
  12. +3
    -0
      rpc/core/tx.go
  13. +1
    -58
      rpc/core/types/responses.go
  14. +9
    -13
      rpc/lib/client/http_client.go
  15. +7
    -8
      rpc/lib/client/ws_client.go
  16. +78
    -41
      rpc/lib/rpc_test.go
  17. +4
    -6
      rpc/lib/server/handlers.go
  18. +133
    -0
      rpc/lib/server/parse_test.go
  19. +290
    -0
      rpc/lib/server/wire.go
  20. +8
    -3
      rpc/lib/types/types.go
  21. +39
    -45
      rpc/test/client_test.go
  22. +9
    -8
      rpc/test/helpers.go
  23. +1
    -1
      test/app/counter_test.sh
  24. +2
    -2
      test/app/dummy_test.sh
  25. +5
    -5
      test/p2p/atomic_broadcast/test.sh
  26. +4
    -4
      test/p2p/basic/test.sh
  27. +4
    -4
      test/p2p/fast_sync/check_peer.sh
  28. +3
    -3
      test/p2p/kill_all/check_peers.sh
  29. +1
    -1
      test/p2p/pex/check_peer.sh
  30. +2
    -2
      test/persist/test_failure_indices.sh
  31. +2
    -2
      test/persist/test_simple.sh
  32. +58
    -25
      types/events.go

+ 1
- 1
consensus/common_test.go View File

@ -206,7 +206,7 @@ func subscribeToVoter(cs *ConsensusState, addr []byte) chan interface{} {
go func() { go func() {
for { for {
v := <-voteCh0 v := <-voteCh0
vote := v.(types.EventDataVote)
vote := v.(types.TMEventData).Unwrap().(types.EventDataVote)
// we only fire for our own votes // we only fire for our own votes
if bytes.Equal(addr, vote.Vote.ValidatorAddress) { if bytes.Equal(addr, vote.Vote.ValidatorAddress) {
voteCh <- v voteCh <- v


+ 1
- 1
consensus/mempool_test.go View File

@ -44,7 +44,7 @@ func TestTxConcurrentWithCommit(t *testing.T) {
for nTxs := 0; nTxs < NTxs; { for nTxs := 0; nTxs < NTxs; {
select { select {
case b := <-newBlockCh: case b := <-newBlockCh:
nTxs += b.(types.EventDataNewBlock).Block.Header.NumTxs
nTxs += b.(types.TMEventData).Unwrap().(types.EventDataNewBlock).Block.Header.NumTxs
case <-ticker.C: case <-ticker.C:
panic("Timed out waiting to commit blocks with transactions") panic("Timed out waiting to commit blocks with transactions")
} }


+ 4
- 4
consensus/reactor.go View File

@ -8,11 +8,11 @@ import (
"sync" "sync"
"time" "time"
. "github.com/tendermint/tmlibs/common"
"github.com/tendermint/tendermint/p2p"
"github.com/tendermint/go-wire" "github.com/tendermint/go-wire"
"github.com/tendermint/tendermint/p2p"
sm "github.com/tendermint/tendermint/state" sm "github.com/tendermint/tendermint/state"
"github.com/tendermint/tendermint/types" "github.com/tendermint/tendermint/types"
. "github.com/tendermint/tmlibs/common"
) )
const ( const (
@ -299,12 +299,12 @@ func (conR *ConsensusReactor) SetEventSwitch(evsw types.EventSwitch) {
func (conR *ConsensusReactor) registerEventCallbacks() { func (conR *ConsensusReactor) registerEventCallbacks() {
types.AddListenerForEvent(conR.evsw, "conR", types.EventStringNewRoundStep(), func(data types.TMEventData) { types.AddListenerForEvent(conR.evsw, "conR", types.EventStringNewRoundStep(), func(data types.TMEventData) {
rs := data.(types.EventDataRoundState).RoundState.(*RoundState)
rs := data.Unwrap().(types.EventDataRoundState).RoundState.(*RoundState)
conR.broadcastNewRoundStep(rs) conR.broadcastNewRoundStep(rs)
}) })
types.AddListenerForEvent(conR.evsw, "conR", types.EventStringVote(), func(data types.TMEventData) { types.AddListenerForEvent(conR.evsw, "conR", types.EventStringVote(), func(data types.TMEventData) {
edv := data.(types.EventDataVote)
edv := data.Unwrap().(types.EventDataVote)
conR.broadcastHasVoteMessage(edv.Vote) conR.broadcastHasVoteMessage(edv.Vote)
}) })
} }


+ 3
- 3
consensus/reactor_test.go View File

@ -8,10 +8,10 @@ import (
"github.com/tendermint/tendermint/config/tendermint_test" "github.com/tendermint/tendermint/config/tendermint_test"
"github.com/tendermint/tmlibs/events"
"github.com/tendermint/abci/example/dummy"
"github.com/tendermint/tendermint/p2p" "github.com/tendermint/tendermint/p2p"
"github.com/tendermint/tendermint/types" "github.com/tendermint/tendermint/types"
"github.com/tendermint/abci/example/dummy"
"github.com/tendermint/tmlibs/events"
) )
func init() { func init() {
@ -252,7 +252,7 @@ func TestReactorWithTimeoutCommit(t *testing.T) {
func waitForAndValidateBlock(t *testing.T, n int, activeVals map[string]struct{}, eventChans []chan interface{}, css []*ConsensusState, txs ...[]byte) { func waitForAndValidateBlock(t *testing.T, n int, activeVals map[string]struct{}, eventChans []chan interface{}, css []*ConsensusState, txs ...[]byte) {
timeoutWaitGroup(t, n, func(wg *sync.WaitGroup, j int) { timeoutWaitGroup(t, n, func(wg *sync.WaitGroup, j int) {
newBlockI := <-eventChans[j] newBlockI := <-eventChans[j]
newBlock := newBlockI.(types.EventDataNewBlock).Block
newBlock := newBlockI.(types.TMEventData).Unwrap().(types.EventDataNewBlock).Block
log.Warn("Got block", "height", newBlock.Height, "validator", j) log.Warn("Got block", "height", newBlock.Height, "validator", j)
err := validateBlock(newBlock, activeVals) err := validateBlock(newBlock, activeVals)
if err != nil { if err != nil {


+ 15
- 15
consensus/state_test.go View File

@ -6,9 +6,9 @@ import (
"testing" "testing"
"time" "time"
. "github.com/tendermint/tmlibs/common"
"github.com/tendermint/tendermint/config/tendermint_test" "github.com/tendermint/tendermint/config/tendermint_test"
"github.com/tendermint/tendermint/types" "github.com/tendermint/tendermint/types"
. "github.com/tendermint/tmlibs/common"
) )
func init() { func init() {
@ -248,7 +248,7 @@ func TestFullRound1(t *testing.T) {
// grab proposal // grab proposal
re := <-propCh re := <-propCh
propBlockHash := re.(types.EventDataRoundState).RoundState.(*RoundState).ProposalBlock.Hash()
propBlockHash := re.(types.TMEventData).Unwrap().(types.EventDataRoundState).RoundState.(*RoundState).ProposalBlock.Hash()
<-voteCh // wait for prevote <-voteCh // wait for prevote
// NOTE: voteChan cap of 0 ensures we can complete this // NOTE: voteChan cap of 0 ensures we can complete this
@ -345,7 +345,7 @@ func TestLockNoPOL(t *testing.T) {
cs1.startRoutines(0) cs1.startRoutines(0)
re := <-proposalCh re := <-proposalCh
rs := re.(types.EventDataRoundState).RoundState.(*RoundState)
rs := re.(types.TMEventData).Unwrap().(types.EventDataRoundState).RoundState.(*RoundState)
theBlockHash := rs.ProposalBlock.Hash() theBlockHash := rs.ProposalBlock.Hash()
<-voteCh // prevote <-voteCh // prevote
@ -385,7 +385,7 @@ func TestLockNoPOL(t *testing.T) {
// now we're on a new round and not the proposer, so wait for timeout // now we're on a new round and not the proposer, so wait for timeout
re = <-timeoutProposeCh re = <-timeoutProposeCh
rs = re.(types.EventDataRoundState).RoundState.(*RoundState)
rs = re.(types.TMEventData).Unwrap().(types.EventDataRoundState).RoundState.(*RoundState)
if rs.ProposalBlock != nil { if rs.ProposalBlock != nil {
panic("Expected proposal block to be nil") panic("Expected proposal block to be nil")
@ -429,7 +429,7 @@ func TestLockNoPOL(t *testing.T) {
incrementRound(vs2) incrementRound(vs2)
re = <-proposalCh re = <-proposalCh
rs = re.(types.EventDataRoundState).RoundState.(*RoundState)
rs = re.(types.TMEventData).Unwrap().(types.EventDataRoundState).RoundState.(*RoundState)
// now we're on a new round and are the proposer // now we're on a new round and are the proposer
if !bytes.Equal(rs.ProposalBlock.Hash(), rs.LockedBlock.Hash()) { if !bytes.Equal(rs.ProposalBlock.Hash(), rs.LockedBlock.Hash()) {
@ -518,7 +518,7 @@ func TestLockPOLRelock(t *testing.T) {
<-newRoundCh <-newRoundCh
re := <-proposalCh re := <-proposalCh
rs := re.(types.EventDataRoundState).RoundState.(*RoundState)
rs := re.(types.TMEventData).Unwrap().(types.EventDataRoundState).RoundState.(*RoundState)
theBlockHash := rs.ProposalBlock.Hash() theBlockHash := rs.ProposalBlock.Hash()
<-voteCh // prevote <-voteCh // prevote
@ -589,9 +589,9 @@ func TestLockPOLRelock(t *testing.T) {
_, _ = <-voteCh, <-voteCh _, _ = <-voteCh, <-voteCh
be := <-newBlockCh be := <-newBlockCh
b := be.(types.EventDataNewBlockHeader)
b := be.(types.TMEventData).Unwrap().(types.EventDataNewBlockHeader)
re = <-newRoundCh re = <-newRoundCh
rs = re.(types.EventDataRoundState).RoundState.(*RoundState)
rs = re.(types.TMEventData).Unwrap().(types.EventDataRoundState).RoundState.(*RoundState)
if rs.Height != 2 { if rs.Height != 2 {
panic("Expected height to increment") panic("Expected height to increment")
} }
@ -627,7 +627,7 @@ func TestLockPOLUnlock(t *testing.T) {
startTestRound(cs1, cs1.Height, 0) startTestRound(cs1, cs1.Height, 0)
<-newRoundCh <-newRoundCh
re := <-proposalCh re := <-proposalCh
rs := re.(types.EventDataRoundState).RoundState.(*RoundState)
rs := re.(types.TMEventData).Unwrap().(types.EventDataRoundState).RoundState.(*RoundState)
theBlockHash := rs.ProposalBlock.Hash() theBlockHash := rs.ProposalBlock.Hash()
<-voteCh // prevote <-voteCh // prevote
@ -653,7 +653,7 @@ func TestLockPOLUnlock(t *testing.T) {
// timeout to new round // timeout to new round
re = <-timeoutWaitCh re = <-timeoutWaitCh
rs = re.(types.EventDataRoundState).RoundState.(*RoundState)
rs = re.(types.TMEventData).Unwrap().(types.EventDataRoundState).RoundState.(*RoundState)
lockedBlockHash := rs.LockedBlock.Hash() lockedBlockHash := rs.LockedBlock.Hash()
//XXX: this isnt gauranteed to get there before the timeoutPropose ... //XXX: this isnt gauranteed to get there before the timeoutPropose ...
@ -713,7 +713,7 @@ func TestLockPOLSafety1(t *testing.T) {
startTestRound(cs1, cs1.Height, 0) startTestRound(cs1, cs1.Height, 0)
<-newRoundCh <-newRoundCh
re := <-proposalCh re := <-proposalCh
rs := re.(types.EventDataRoundState).RoundState.(*RoundState)
rs := re.(types.TMEventData).Unwrap().(types.EventDataRoundState).RoundState.(*RoundState)
propBlock := rs.ProposalBlock propBlock := rs.ProposalBlock
<-voteCh // prevote <-voteCh // prevote
@ -761,7 +761,7 @@ func TestLockPOLSafety1(t *testing.T) {
re = <-proposalCh re = <-proposalCh
} }
rs = re.(types.EventDataRoundState).RoundState.(*RoundState)
rs = re.(types.TMEventData).Unwrap().(types.EventDataRoundState).RoundState.(*RoundState)
if rs.LockedBlock != nil { if rs.LockedBlock != nil {
panic("we should not be locked!") panic("we should not be locked!")
@ -1009,7 +1009,7 @@ func TestHalt1(t *testing.T) {
startTestRound(cs1, cs1.Height, 0) startTestRound(cs1, cs1.Height, 0)
<-newRoundCh <-newRoundCh
re := <-proposalCh re := <-proposalCh
rs := re.(types.EventDataRoundState).RoundState.(*RoundState)
rs := re.(types.TMEventData).Unwrap().(types.EventDataRoundState).RoundState.(*RoundState)
propBlock := rs.ProposalBlock propBlock := rs.ProposalBlock
propBlockParts := propBlock.MakePartSet(partSize) propBlockParts := propBlock.MakePartSet(partSize)
@ -1032,7 +1032,7 @@ func TestHalt1(t *testing.T) {
// timeout to new round // timeout to new round
<-timeoutWaitCh <-timeoutWaitCh
re = <-newRoundCh re = <-newRoundCh
rs = re.(types.EventDataRoundState).RoundState.(*RoundState)
rs = re.(types.TMEventData).Unwrap().(types.EventDataRoundState).RoundState.(*RoundState)
log.Notice("### ONTO ROUND 1") log.Notice("### ONTO ROUND 1")
/*Round2 /*Round2
@ -1050,7 +1050,7 @@ func TestHalt1(t *testing.T) {
// receiving that precommit should take us straight to commit // receiving that precommit should take us straight to commit
<-newBlockCh <-newBlockCh
re = <-newRoundCh re = <-newRoundCh
rs = re.(types.EventDataRoundState).RoundState.(*RoundState)
rs = re.(types.TMEventData).Unwrap().(types.EventDataRoundState).RoundState.(*RoundState)
if rs.Height != 2 { if rs.Height != 2 {
panic("expected height to increment") panic("expected height to increment")


+ 2
- 2
rpc/client/event_test.go View File

@ -25,7 +25,7 @@ func TestHeaderEvents(t *testing.T) {
evtTyp := types.EventStringNewBlockHeader() evtTyp := types.EventStringNewBlockHeader()
evt, err := client.WaitForOneEvent(c, evtTyp, 1*time.Second) evt, err := client.WaitForOneEvent(c, evtTyp, 1*time.Second)
require.Nil(err, "%d: %+v", i, err) require.Nil(err, "%d: %+v", i, err)
_, ok := evt.(types.EventDataNewBlockHeader)
_, ok := evt.Unwrap().(types.EventDataNewBlockHeader)
require.True(ok, "%d: %#v", i, evt) require.True(ok, "%d: %#v", i, evt)
// TODO: more checks... // TODO: more checks...
} }
@ -56,7 +56,7 @@ func TestTxEvents(t *testing.T) {
evt, err := client.WaitForOneEvent(c, evtTyp, 1*time.Second) evt, err := client.WaitForOneEvent(c, evtTyp, 1*time.Second)
require.Nil(err, "%d: %+v", i, err) require.Nil(err, "%d: %+v", i, err)
// and make sure it has the proper info // and make sure it has the proper info
txe, ok := evt.(types.EventDataTx)
txe, ok := evt.Unwrap().(types.EventDataTx)
require.True(ok, "%d: %#v", i, evt) require.True(ok, "%d: %#v", i, evt)
// make sure this is the proper tx // make sure this is the proper tx
require.EqualValues(tx, txe.Tx) require.EqualValues(tx, txe.Tx)


+ 3
- 3
rpc/client/helpers.go View File

@ -4,9 +4,9 @@ import (
"time" "time"
"github.com/pkg/errors" "github.com/pkg/errors"
"github.com/tendermint/tendermint/types"
cmn "github.com/tendermint/tmlibs/common" cmn "github.com/tendermint/tmlibs/common"
events "github.com/tendermint/tmlibs/events" events "github.com/tendermint/tmlibs/events"
"github.com/tendermint/tendermint/types"
) )
// Waiter is informed of current height, decided whether to quit early // Waiter is informed of current height, decided whether to quit early
@ -77,12 +77,12 @@ func WaitForOneEvent(evsw types.EventSwitch,
select { select {
case <-quit: case <-quit:
return nil, errors.New("timed out waiting for event")
return types.TMEventData{}, errors.New("timed out waiting for event")
case evt := <-evts: case evt := <-evts:
tmevt, ok := evt.(types.TMEventData) tmevt, ok := evt.(types.TMEventData)
if ok { if ok {
return tmevt, nil return tmevt, nil
} }
return nil, errors.Errorf("Got unexpected event type: %#v", evt)
return types.TMEventData{}, errors.Errorf("Got unexpected event type: %#v", evt)
} }
} }

+ 44
- 48
rpc/client/httpclient.go View File

@ -1,10 +1,10 @@
package client package client
import ( import (
"encoding/json"
"fmt" "fmt"
"github.com/pkg/errors" "github.com/pkg/errors"
wire "github.com/tendermint/go-wire"
data "github.com/tendermint/go-wire/data" data "github.com/tendermint/go-wire/data"
ctypes "github.com/tendermint/tendermint/rpc/core/types" ctypes "github.com/tendermint/tendermint/rpc/core/types"
"github.com/tendermint/tendermint/rpc/lib/client" "github.com/tendermint/tendermint/rpc/lib/client"
@ -50,42 +50,41 @@ func (c *HTTP) _assertIsEventSwitch() types.EventSwitch {
} }
func (c *HTTP) Status() (*ctypes.ResultStatus, error) { func (c *HTTP) Status() (*ctypes.ResultStatus, error) {
tmResult := new(ctypes.TMResult)
_, err := c.rpc.Call("status", map[string]interface{}{}, tmResult)
result := new(ctypes.ResultStatus)
_, err := c.rpc.Call("status", map[string]interface{}{}, result)
if err != nil { if err != nil {
return nil, errors.Wrap(err, "Status") return nil, errors.Wrap(err, "Status")
} }
// note: panics if rpc doesn't match. okay???
return tmResult.Unwrap().(*ctypes.ResultStatus), nil
return result, nil
} }
func (c *HTTP) ABCIInfo() (*ctypes.ResultABCIInfo, error) { func (c *HTTP) ABCIInfo() (*ctypes.ResultABCIInfo, error) {
tmResult := new(ctypes.TMResult)
_, err := c.rpc.Call("abci_info", map[string]interface{}{}, tmResult)
result := new(ctypes.ResultABCIInfo)
_, err := c.rpc.Call("abci_info", map[string]interface{}{}, result)
if err != nil { if err != nil {
return nil, errors.Wrap(err, "ABCIInfo") return nil, errors.Wrap(err, "ABCIInfo")
} }
return tmResult.Unwrap().(*ctypes.ResultABCIInfo), nil
return result, nil
} }
func (c *HTTP) ABCIQuery(path string, data data.Bytes, prove bool) (*ctypes.ResultABCIQuery, error) { func (c *HTTP) ABCIQuery(path string, data data.Bytes, prove bool) (*ctypes.ResultABCIQuery, error) {
tmResult := new(ctypes.TMResult)
result := new(ctypes.ResultABCIQuery)
_, err := c.rpc.Call("abci_query", _, err := c.rpc.Call("abci_query",
map[string]interface{}{"path": path, "data": data, "prove": prove}, map[string]interface{}{"path": path, "data": data, "prove": prove},
tmResult)
result)
if err != nil { if err != nil {
return nil, errors.Wrap(err, "ABCIQuery") return nil, errors.Wrap(err, "ABCIQuery")
} }
return tmResult.Unwrap().(*ctypes.ResultABCIQuery), nil
return result, nil
} }
func (c *HTTP) BroadcastTxCommit(tx types.Tx) (*ctypes.ResultBroadcastTxCommit, error) { func (c *HTTP) BroadcastTxCommit(tx types.Tx) (*ctypes.ResultBroadcastTxCommit, error) {
tmResult := new(ctypes.TMResult)
_, err := c.rpc.Call("broadcast_tx_commit", map[string]interface{}{"tx": tx}, tmResult)
result := new(ctypes.ResultBroadcastTxCommit)
_, err := c.rpc.Call("broadcast_tx_commit", map[string]interface{}{"tx": tx}, result)
if err != nil { if err != nil {
return nil, errors.Wrap(err, "broadcast_tx_commit") return nil, errors.Wrap(err, "broadcast_tx_commit")
} }
return tmResult.Unwrap().(*ctypes.ResultBroadcastTxCommit), nil
return result, nil
} }
func (c *HTTP) BroadcastTxAsync(tx types.Tx) (*ctypes.ResultBroadcastTx, error) { func (c *HTTP) BroadcastTxAsync(tx types.Tx) (*ctypes.ResultBroadcastTx, error) {
@ -97,90 +96,90 @@ func (c *HTTP) BroadcastTxSync(tx types.Tx) (*ctypes.ResultBroadcastTx, error) {
} }
func (c *HTTP) broadcastTX(route string, tx types.Tx) (*ctypes.ResultBroadcastTx, error) { func (c *HTTP) broadcastTX(route string, tx types.Tx) (*ctypes.ResultBroadcastTx, error) {
tmResult := new(ctypes.TMResult)
_, err := c.rpc.Call(route, map[string]interface{}{"tx": tx}, tmResult)
result := new(ctypes.ResultBroadcastTx)
_, err := c.rpc.Call(route, map[string]interface{}{"tx": tx}, result)
if err != nil { if err != nil {
return nil, errors.Wrap(err, route) return nil, errors.Wrap(err, route)
} }
return tmResult.Unwrap().(*ctypes.ResultBroadcastTx), nil
return result, nil
} }
func (c *HTTP) NetInfo() (*ctypes.ResultNetInfo, error) { func (c *HTTP) NetInfo() (*ctypes.ResultNetInfo, error) {
tmResult := new(ctypes.TMResult)
_, err := c.rpc.Call("net_info", map[string]interface{}{}, tmResult)
result := new(ctypes.ResultNetInfo)
_, err := c.rpc.Call("net_info", map[string]interface{}{}, result)
if err != nil { if err != nil {
return nil, errors.Wrap(err, "NetInfo") return nil, errors.Wrap(err, "NetInfo")
} }
return tmResult.Unwrap().(*ctypes.ResultNetInfo), nil
return result, nil
} }
func (c *HTTP) DumpConsensusState() (*ctypes.ResultDumpConsensusState, error) { func (c *HTTP) DumpConsensusState() (*ctypes.ResultDumpConsensusState, error) {
tmResult := new(ctypes.TMResult)
_, err := c.rpc.Call("dump_consensus_state", map[string]interface{}{}, tmResult)
result := new(ctypes.ResultDumpConsensusState)
_, err := c.rpc.Call("dump_consensus_state", map[string]interface{}{}, result)
if err != nil { if err != nil {
return nil, errors.Wrap(err, "DumpConsensusState") return nil, errors.Wrap(err, "DumpConsensusState")
} }
return tmResult.Unwrap().(*ctypes.ResultDumpConsensusState), nil
return result, nil
} }
func (c *HTTP) BlockchainInfo(minHeight, maxHeight int) (*ctypes.ResultBlockchainInfo, error) { func (c *HTTP) BlockchainInfo(minHeight, maxHeight int) (*ctypes.ResultBlockchainInfo, error) {
tmResult := new(ctypes.TMResult)
result := new(ctypes.ResultBlockchainInfo)
_, err := c.rpc.Call("blockchain", _, err := c.rpc.Call("blockchain",
map[string]interface{}{"minHeight": minHeight, "maxHeight": maxHeight}, map[string]interface{}{"minHeight": minHeight, "maxHeight": maxHeight},
tmResult)
result)
if err != nil { if err != nil {
return nil, errors.Wrap(err, "BlockchainInfo") return nil, errors.Wrap(err, "BlockchainInfo")
} }
return tmResult.Unwrap().(*ctypes.ResultBlockchainInfo), nil
return result, nil
} }
func (c *HTTP) Genesis() (*ctypes.ResultGenesis, error) { func (c *HTTP) Genesis() (*ctypes.ResultGenesis, error) {
tmResult := new(ctypes.TMResult)
_, err := c.rpc.Call("genesis", map[string]interface{}{}, tmResult)
result := new(ctypes.ResultGenesis)
_, err := c.rpc.Call("genesis", map[string]interface{}{}, result)
if err != nil { if err != nil {
return nil, errors.Wrap(err, "Genesis") return nil, errors.Wrap(err, "Genesis")
} }
return tmResult.Unwrap().(*ctypes.ResultGenesis), nil
return result, nil
} }
func (c *HTTP) Block(height int) (*ctypes.ResultBlock, error) { func (c *HTTP) Block(height int) (*ctypes.ResultBlock, error) {
tmResult := new(ctypes.TMResult)
_, err := c.rpc.Call("block", map[string]interface{}{"height": height}, tmResult)
result := new(ctypes.ResultBlock)
_, err := c.rpc.Call("block", map[string]interface{}{"height": height}, result)
if err != nil { if err != nil {
return nil, errors.Wrap(err, "Block") return nil, errors.Wrap(err, "Block")
} }
return tmResult.Unwrap().(*ctypes.ResultBlock), nil
return result, nil
} }
func (c *HTTP) Commit(height int) (*ctypes.ResultCommit, error) { func (c *HTTP) Commit(height int) (*ctypes.ResultCommit, error) {
tmResult := new(ctypes.TMResult)
_, err := c.rpc.Call("commit", map[string]interface{}{"height": height}, tmResult)
result := new(ctypes.ResultCommit)
_, err := c.rpc.Call("commit", map[string]interface{}{"height": height}, result)
if err != nil { if err != nil {
return nil, errors.Wrap(err, "Commit") return nil, errors.Wrap(err, "Commit")
} }
return tmResult.Unwrap().(*ctypes.ResultCommit), nil
return result, nil
} }
func (c *HTTP) Tx(hash []byte, prove bool) (*ctypes.ResultTx, error) { func (c *HTTP) Tx(hash []byte, prove bool) (*ctypes.ResultTx, error) {
tmResult := new(ctypes.TMResult)
result := new(ctypes.ResultTx)
query := map[string]interface{}{ query := map[string]interface{}{
"hash": hash, "hash": hash,
"prove": prove, "prove": prove,
} }
_, err := c.rpc.Call("tx", query, tmResult)
_, err := c.rpc.Call("tx", query, result)
if err != nil { if err != nil {
return nil, errors.Wrap(err, "Tx") return nil, errors.Wrap(err, "Tx")
} }
return tmResult.Unwrap().(*ctypes.ResultTx), nil
return result, nil
} }
func (c *HTTP) Validators() (*ctypes.ResultValidators, error) { func (c *HTTP) Validators() (*ctypes.ResultValidators, error) {
tmResult := new(ctypes.TMResult)
_, err := c.rpc.Call("validators", map[string]interface{}{}, tmResult)
result := new(ctypes.ResultValidators)
_, err := c.rpc.Call("validators", map[string]interface{}{}, result)
if err != nil { if err != nil {
return nil, errors.Wrap(err, "Validators") return nil, errors.Wrap(err, "Validators")
} }
return tmResult.Unwrap().(*ctypes.ResultValidators), nil
return result, nil
} }
/** websocket event stuff here... **/ /** websocket event stuff here... **/
@ -335,18 +334,15 @@ func (w *WSEvents) eventListener() {
// some implementation of types.TMEventData, and sends it off // some implementation of types.TMEventData, and sends it off
// on the merry way to the EventSwitch // on the merry way to the EventSwitch
func (w *WSEvents) parseEvent(data []byte) (err error) { func (w *WSEvents) parseEvent(data []byte) (err error) {
result := new(ctypes.TMResult)
wire.ReadJSONPtr(result, data, &err)
result := new(ctypes.ResultEvent)
err = json.Unmarshal(data, result)
if err != nil { if err != nil {
return err
}
event, ok := result.Unwrap().(*ctypes.ResultEvent)
if !ok {
// ignore silently (eg. subscribe, unsubscribe and maybe other events) // ignore silently (eg. subscribe, unsubscribe and maybe other events)
// TODO: ?
return nil return nil
} }
// looks good! let's fire this baby! // looks good! let's fire this baby!
w.EventSwitch.FireEvent(event.Name, event.Data)
w.EventSwitch.FireEvent(result.Name, result.Data)
return nil return nil
} }


+ 2
- 2
rpc/core/events.go View File

@ -11,8 +11,8 @@ func Subscribe(wsCtx rpctypes.WSRPCContext, event string) (*ctypes.ResultSubscri
types.AddListenerForEvent(wsCtx.GetEventSwitch(), wsCtx.GetRemoteAddr(), event, func(msg types.TMEventData) { types.AddListenerForEvent(wsCtx.GetEventSwitch(), wsCtx.GetRemoteAddr(), event, func(msg types.TMEventData) {
// NOTE: EventSwitch callbacks must be nonblocking // NOTE: EventSwitch callbacks must be nonblocking
// NOTE: RPCResponses of subscribed events have id suffix "#event" // NOTE: RPCResponses of subscribed events have id suffix "#event"
tmResult := ctypes.TMResult{&ctypes.ResultEvent{event, msg}}
wsCtx.TryWriteRPCResponse(rpctypes.NewRPCResponse(wsCtx.Request.ID+"#event", &tmResult, ""))
tmResult := &ctypes.ResultEvent{event, msg}
wsCtx.TryWriteRPCResponse(rpctypes.NewRPCResponse(wsCtx.Request.ID+"#event", tmResult, ""))
}) })
return &ctypes.ResultSubscribe{}, nil return &ctypes.ResultSubscribe{}, nil
} }


+ 1
- 1
rpc/core/mempool.go View File

@ -50,7 +50,7 @@ func BroadcastTxCommit(tx types.Tx) (*ctypes.ResultBroadcastTxCommit, error) {
// subscribe to tx being committed in block // subscribe to tx being committed in block
deliverTxResCh := make(chan types.EventDataTx, 1) deliverTxResCh := make(chan types.EventDataTx, 1)
types.AddListenerForEvent(eventSwitch, "rpc", types.EventStringTx(tx), func(data types.TMEventData) { types.AddListenerForEvent(eventSwitch, "rpc", types.EventStringTx(tx), func(data types.TMEventData) {
deliverTxResCh <- data.(types.EventDataTx)
deliverTxResCh <- data.Unwrap().(types.EventDataTx)
}) })
// broadcast the tx and register checktx callback // broadcast the tx and register checktx callback


+ 23
- 150
rpc/core/routes.go View File

@ -1,173 +1,46 @@
package core package core
import ( import (
data "github.com/tendermint/go-wire/data"
ctypes "github.com/tendermint/tendermint/rpc/core/types"
rpc "github.com/tendermint/tendermint/rpc/lib/server" rpc "github.com/tendermint/tendermint/rpc/lib/server"
"github.com/tendermint/tendermint/rpc/lib/types"
"github.com/tendermint/tendermint/types"
) )
// TODO: better system than "unsafe" prefix // TODO: better system than "unsafe" prefix
var Routes = map[string]*rpc.RPCFunc{ var Routes = map[string]*rpc.RPCFunc{
// subscribe/unsubscribe are reserved for websocket events. // subscribe/unsubscribe are reserved for websocket events.
"subscribe": rpc.NewWSRPCFunc(SubscribeResult, "event"),
"unsubscribe": rpc.NewWSRPCFunc(UnsubscribeResult, "event"),
"subscribe": rpc.NewWSRPCFunc(Subscribe, "event"),
"unsubscribe": rpc.NewWSRPCFunc(Unsubscribe, "event"),
// info API // info API
"status": rpc.NewRPCFunc(StatusResult, ""),
"net_info": rpc.NewRPCFunc(NetInfoResult, ""),
"blockchain": rpc.NewRPCFunc(BlockchainInfoResult, "minHeight,maxHeight"),
"genesis": rpc.NewRPCFunc(GenesisResult, ""),
"block": rpc.NewRPCFunc(BlockResult, "height"),
"commit": rpc.NewRPCFunc(CommitResult, "height"),
"tx": rpc.NewRPCFunc(TxResult, "hash,prove"),
"validators": rpc.NewRPCFunc(ValidatorsResult, ""),
"dump_consensus_state": rpc.NewRPCFunc(DumpConsensusStateResult, ""),
"unconfirmed_txs": rpc.NewRPCFunc(UnconfirmedTxsResult, ""),
"num_unconfirmed_txs": rpc.NewRPCFunc(NumUnconfirmedTxsResult, ""),
"status": rpc.NewRPCFunc(Status, ""),
"net_info": rpc.NewRPCFunc(NetInfo, ""),
"blockchain": rpc.NewRPCFunc(BlockchainInfo, "minHeight,maxHeight"),
"genesis": rpc.NewRPCFunc(Genesis, ""),
"block": rpc.NewRPCFunc(Block, "height"),
"commit": rpc.NewRPCFunc(Commit, "height"),
"tx": rpc.NewRPCFunc(Tx, "hash,prove"),
"validators": rpc.NewRPCFunc(Validators, ""),
"dump_consensus_state": rpc.NewRPCFunc(DumpConsensusState, ""),
"unconfirmed_txs": rpc.NewRPCFunc(UnconfirmedTxs, ""),
"num_unconfirmed_txs": rpc.NewRPCFunc(NumUnconfirmedTxs, ""),
// broadcast API // broadcast API
"broadcast_tx_commit": rpc.NewRPCFunc(BroadcastTxCommitResult, "tx"),
"broadcast_tx_sync": rpc.NewRPCFunc(BroadcastTxSyncResult, "tx"),
"broadcast_tx_async": rpc.NewRPCFunc(BroadcastTxAsyncResult, "tx"),
"broadcast_tx_commit": rpc.NewRPCFunc(BroadcastTxCommit, "tx"),
"broadcast_tx_sync": rpc.NewRPCFunc(BroadcastTxSync, "tx"),
"broadcast_tx_async": rpc.NewRPCFunc(BroadcastTxAsync, "tx"),
// abci API // abci API
"abci_query": rpc.NewRPCFunc(ABCIQueryResult, "path,data,prove"),
"abci_info": rpc.NewRPCFunc(ABCIInfoResult, ""),
"abci_query": rpc.NewRPCFunc(ABCIQuery, "path,data,prove"),
"abci_info": rpc.NewRPCFunc(ABCIInfo, ""),
// control API // control API
"dial_seeds": rpc.NewRPCFunc(UnsafeDialSeedsResult, "seeds"),
"dial_seeds": rpc.NewRPCFunc(UnsafeDialSeeds, "seeds"),
"unsafe_flush_mempool": rpc.NewRPCFunc(UnsafeFlushMempool, ""), "unsafe_flush_mempool": rpc.NewRPCFunc(UnsafeFlushMempool, ""),
// config is not in general thread safe. expose specifics if you need em // config is not in general thread safe. expose specifics if you need em
// "unsafe_set_config": rpc.NewRPCFunc(UnsafeSetConfigResult, "type,key,value"),
// "unsafe_set_config": rpc.NewRPCFunc(UnsafeSetConfig, "type,key,value"),
// profiler API // profiler API
"unsafe_start_cpu_profiler": rpc.NewRPCFunc(UnsafeStartCPUProfilerResult, "filename"),
"unsafe_stop_cpu_profiler": rpc.NewRPCFunc(UnsafeStopCPUProfilerResult, ""),
"unsafe_write_heap_profile": rpc.NewRPCFunc(UnsafeWriteHeapProfileResult, "filename"),
}
func SubscribeResult(wsCtx rpctypes.WSRPCContext, event string) (ctypes.TMResult, error) {
res, err := Subscribe(wsCtx, event)
return ctypes.TMResult{res}, err
}
func UnsubscribeResult(wsCtx rpctypes.WSRPCContext, event string) (ctypes.TMResult, error) {
res, err := Unsubscribe(wsCtx, event)
return ctypes.TMResult{res}, err
}
func StatusResult() (ctypes.TMResult, error) {
res, err := Status()
return ctypes.TMResult{res}, err
}
func NetInfoResult() (ctypes.TMResult, error) {
res, err := NetInfo()
return ctypes.TMResult{res}, err
}
func UnsafeDialSeedsResult(seeds []string) (ctypes.TMResult, error) {
res, err := UnsafeDialSeeds(seeds)
return ctypes.TMResult{res}, err
}
func BlockchainInfoResult(min, max int) (ctypes.TMResult, error) {
res, err := BlockchainInfo(min, max)
return ctypes.TMResult{res}, err
}
func GenesisResult() (ctypes.TMResult, error) {
res, err := Genesis()
return ctypes.TMResult{res}, err
}
func BlockResult(height int) (ctypes.TMResult, error) {
res, err := Block(height)
return ctypes.TMResult{res}, err
}
func CommitResult(height int) (ctypes.TMResult, error) {
res, err := Commit(height)
return ctypes.TMResult{res}, err
}
func ValidatorsResult() (ctypes.TMResult, error) {
res, err := Validators()
return ctypes.TMResult{res}, err
}
func DumpConsensusStateResult() (ctypes.TMResult, error) {
res, err := DumpConsensusState()
return ctypes.TMResult{res}, err
}
func UnconfirmedTxsResult() (ctypes.TMResult, error) {
res, err := UnconfirmedTxs()
return ctypes.TMResult{res}, err
}
func NumUnconfirmedTxsResult() (ctypes.TMResult, error) {
res, err := NumUnconfirmedTxs()
return ctypes.TMResult{res}, err
}
// Tx allow user to query the transaction results. `nil` could mean the
// transaction is in the mempool, invalidated, or was not send in the first
// place.
func TxResult(hash []byte, prove bool) (ctypes.TMResult, error) {
res, err := Tx(hash, prove)
return ctypes.TMResult{res}, err
}
func BroadcastTxCommitResult(tx types.Tx) (ctypes.TMResult, error) {
res, err := BroadcastTxCommit(tx)
return ctypes.TMResult{res}, err
}
func BroadcastTxSyncResult(tx types.Tx) (ctypes.TMResult, error) {
res, err := BroadcastTxSync(tx)
return ctypes.TMResult{res}, err
}
func BroadcastTxAsyncResult(tx types.Tx) (ctypes.TMResult, error) {
res, err := BroadcastTxAsync(tx)
return ctypes.TMResult{res}, err
}
func ABCIQueryResult(path string, data data.Bytes, prove bool) (ctypes.TMResult, error) {
res, err := ABCIQuery(path, data, prove)
return ctypes.TMResult{res}, err
}
func ABCIInfoResult() (ctypes.TMResult, error) {
res, err := ABCIInfo()
return ctypes.TMResult{res}, err
}
func UnsafeFlushMempoolResult() (ctypes.TMResult, error) {
res, err := UnsafeFlushMempool()
return ctypes.TMResult{res}, err
}
func UnsafeSetConfigResult(typ, key, value string) (ctypes.TMResult, error) {
res, err := UnsafeSetConfig(typ, key, value)
return ctypes.TMResult{res}, err
}
func UnsafeStartCPUProfilerResult(filename string) (ctypes.TMResult, error) {
res, err := UnsafeStartCPUProfiler(filename)
return ctypes.TMResult{res}, err
}
func UnsafeStopCPUProfilerResult() (ctypes.TMResult, error) {
res, err := UnsafeStopCPUProfiler()
return ctypes.TMResult{res}, err
}
func UnsafeWriteHeapProfileResult(filename string) (ctypes.TMResult, error) {
res, err := UnsafeWriteHeapProfile(filename)
return ctypes.TMResult{res}, err
"unsafe_start_cpu_profiler": rpc.NewRPCFunc(UnsafeStartCPUProfiler, "filename"),
"unsafe_stop_cpu_profiler": rpc.NewRPCFunc(UnsafeStopCPUProfiler, ""),
"unsafe_write_heap_profile": rpc.NewRPCFunc(UnsafeWriteHeapProfile, "filename"),
} }

+ 3
- 0
rpc/core/tx.go View File

@ -8,6 +8,9 @@ import (
"github.com/tendermint/tendermint/types" "github.com/tendermint/tendermint/types"
) )
// Tx allow user to query the transaction results. `nil` could mean the
// transaction is in the mempool, invalidated, or was not send in the first
// place.
func Tx(hash []byte, prove bool) (*ctypes.ResultTx, error) { func Tx(hash []byte, prove bool) (*ctypes.ResultTx, error) {
// if index is disabled, return error // if index is disabled, return error


+ 1
- 58
rpc/core/types/responses.go View File

@ -7,7 +7,7 @@ import (
"github.com/tendermint/go-crypto" "github.com/tendermint/go-crypto"
"github.com/tendermint/go-wire/data" "github.com/tendermint/go-wire/data"
"github.com/tendermint/tendermint/p2p" "github.com/tendermint/tendermint/p2p"
"github.com/tendermint/tendermint/rpc/lib/types"
// "github.com/tendermint/tendermint/rpc/lib/types"
"github.com/tendermint/tendermint/types" "github.com/tendermint/tendermint/types"
) )
@ -212,60 +212,3 @@ const (
ResultNameUnsafeWriteHeapProfile = "unsafe_write_heap" ResultNameUnsafeWriteHeapProfile = "unsafe_write_heap"
ResultNameUnsafeFlushMempool = "unsafe_flush_mempool" ResultNameUnsafeFlushMempool = "unsafe_flush_mempool"
) )
type TMResultInner interface {
rpctypes.Result
}
type TMResult struct {
TMResultInner `json:"unwrap"`
}
func (tmr TMResult) MarshalJSON() ([]byte, error) {
return tmResultMapper.ToJSON(tmr.TMResultInner)
}
func (tmr *TMResult) UnmarshalJSON(data []byte) (err error) {
parsed, err := tmResultMapper.FromJSON(data)
if err == nil && parsed != nil {
tmr.TMResultInner = parsed.(TMResultInner)
}
return
}
func (tmr TMResult) Unwrap() TMResultInner {
tmrI := tmr.TMResultInner
for wrap, ok := tmrI.(TMResult); ok; wrap, ok = tmrI.(TMResult) {
tmrI = wrap.TMResultInner
}
return tmrI
}
func (tmr TMResult) Empty() bool {
return tmr.TMResultInner == nil
}
var tmResultMapper = data.NewMapper(TMResult{}).
RegisterImplementation(&ResultGenesis{}, ResultNameGenesis, ResultTypeGenesis).
RegisterImplementation(&ResultBlockchainInfo{}, ResultNameBlockchainInfo, ResultTypeBlockchainInfo).
RegisterImplementation(&ResultBlock{}, ResultNameBlock, ResultTypeBlock).
RegisterImplementation(&ResultCommit{}, ResultNameCommit, ResultTypeCommit).
RegisterImplementation(&ResultStatus{}, ResultNameStatus, ResultTypeStatus).
RegisterImplementation(&ResultNetInfo{}, ResultNameNetInfo, ResultTypeNetInfo).
RegisterImplementation(&ResultDialSeeds{}, ResultNameDialSeeds, ResultTypeDialSeeds).
RegisterImplementation(&ResultValidators{}, ResultNameValidators, ResultTypeValidators).
RegisterImplementation(&ResultDumpConsensusState{}, ResultNameDumpConsensusState, ResultTypeDumpConsensusState).
RegisterImplementation(&ResultBroadcastTx{}, ResultNameBroadcastTx, ResultTypeBroadcastTx).
RegisterImplementation(&ResultBroadcastTxCommit{}, ResultNameBroadcastTxCommit, ResultTypeBroadcastTxCommit).
RegisterImplementation(&ResultTx{}, ResultNameTx, ResultTypeTx).
RegisterImplementation(&ResultUnconfirmedTxs{}, ResultNameUnconfirmedTxs, ResultTypeUnconfirmedTxs).
RegisterImplementation(&ResultSubscribe{}, ResultNameSubscribe, ResultTypeSubscribe).
RegisterImplementation(&ResultUnsubscribe{}, ResultNameUnsubscribe, ResultTypeUnsubscribe).
RegisterImplementation(&ResultEvent{}, ResultNameEvent, ResultTypeEvent).
RegisterImplementation(&ResultUnsafeSetConfig{}, ResultNameUnsafeSetConfig, ResultTypeUnsafeSetConfig).
RegisterImplementation(&ResultUnsafeProfile{}, ResultNameUnsafeStartCPUProfiler, ResultTypeUnsafeStartCPUProfiler).
RegisterImplementation(&ResultUnsafeProfile{}, ResultNameUnsafeStopCPUProfiler, ResultTypeUnsafeStopCPUProfiler).
RegisterImplementation(&ResultUnsafeProfile{}, ResultNameUnsafeWriteHeapProfile, ResultTypeUnsafeWriteHeapProfile).
RegisterImplementation(&ResultUnsafeFlushMempool{}, ResultNameUnsafeFlushMempool, ResultTypeUnsafeFlushMempool).
RegisterImplementation(&ResultABCIQuery{}, ResultNameABCIQuery, ResultTypeABCIQuery).
RegisterImplementation(&ResultABCIInfo{}, ResultNameABCIInfo, ResultTypeABCIInfo)

+ 9
- 13
rpc/lib/client/http_client.go View File

@ -12,7 +12,6 @@ import (
"strings" "strings"
"github.com/pkg/errors" "github.com/pkg/errors"
wire "github.com/tendermint/go-wire"
types "github.com/tendermint/tendermint/rpc/lib/types" types "github.com/tendermint/tendermint/rpc/lib/types"
) )
@ -70,15 +69,15 @@ func NewJSONRPCClient(remote string) *JSONRPCClient {
func (c *JSONRPCClient) Call(method string, params map[string]interface{}, result interface{}) (interface{}, error) { func (c *JSONRPCClient) Call(method string, params map[string]interface{}, result interface{}) (interface{}, error) {
// we need this step because we attempt to decode values using `go-wire` // we need this step because we attempt to decode values using `go-wire`
// (handlers.go:176) on the server side // (handlers.go:176) on the server side
encodedParams := make(map[string]interface{})
for k, v := range params {
bytes := json.RawMessage(wire.JSONBytes(v))
encodedParams[k] = &bytes
}
// encodedParams := make(map[string]interface{})
// for k, v := range params {
// bytes := json.RawMessage(wire.JSONBytes(v))
// encodedParams[k] = &bytes
// }
request := types.RPCRequest{ request := types.RPCRequest{
JSONRPC: "2.0", JSONRPC: "2.0",
Method: method, Method: method,
Params: encodedParams,
Params: params,
ID: "", ID: "",
} }
requestBytes, err := json.Marshal(request) requestBytes, err := json.Marshal(request)
@ -153,7 +152,7 @@ func unmarshalResponseBytes(responseBytes []byte, result interface{}) (interface
return nil, errors.Errorf("Response error: %v", errorStr) return nil, errors.Errorf("Response error: %v", errorStr)
} }
// unmarshal the RawMessage into the result // unmarshal the RawMessage into the result
result = wire.ReadJSONPtr(result, *response.Result, &err)
err = json.Unmarshal(*response.Result, result)
if err != nil { if err != nil {
return nil, errors.Errorf("Error unmarshalling rpc response result: %v", err) return nil, errors.Errorf("Error unmarshalling rpc response result: %v", err)
} }
@ -176,8 +175,6 @@ func argsToURLValues(args map[string]interface{}) (url.Values, error) {
} }
func argsToJson(args map[string]interface{}) error { func argsToJson(args map[string]interface{}) error {
var n int
var err error
for k, v := range args { for k, v := range args {
rt := reflect.TypeOf(v) rt := reflect.TypeOf(v)
isByteSlice := rt.Kind() == reflect.Slice && rt.Elem().Kind() == reflect.Uint8 isByteSlice := rt.Kind() == reflect.Slice && rt.Elem().Kind() == reflect.Uint8
@ -188,12 +185,11 @@ func argsToJson(args map[string]interface{}) error {
} }
// Pass everything else to go-wire // Pass everything else to go-wire
buf := new(bytes.Buffer)
wire.WriteJSON(v, buf, &n, &err)
data, err := json.Marshal(v)
if err != nil { if err != nil {
return err return err
} }
args[k] = buf.String()
args[k] = string(data)
} }
return nil return nil
} }

+ 7
- 8
rpc/lib/client/ws_client.go View File

@ -8,9 +8,8 @@ import (
"github.com/gorilla/websocket" "github.com/gorilla/websocket"
"github.com/pkg/errors" "github.com/pkg/errors"
cmn "github.com/tendermint/tmlibs/common"
types "github.com/tendermint/tendermint/rpc/lib/types" types "github.com/tendermint/tendermint/rpc/lib/types"
wire "github.com/tendermint/go-wire"
cmn "github.com/tendermint/tmlibs/common"
) )
const ( const (
@ -157,15 +156,15 @@ func (wsc *WSClient) Unsubscribe(eventid string) error {
func (wsc *WSClient) Call(method string, params map[string]interface{}) error { func (wsc *WSClient) Call(method string, params map[string]interface{}) error {
// we need this step because we attempt to decode values using `go-wire` // we need this step because we attempt to decode values using `go-wire`
// (handlers.go:470) on the server side // (handlers.go:470) on the server side
encodedParams := make(map[string]interface{})
for k, v := range params {
bytes := json.RawMessage(wire.JSONBytes(v))
encodedParams[k] = &bytes
}
// encodedParams := make(map[string]interface{})
// for k, v := range params {
// bytes := json.RawMessage(wire.JSONBytes(v))
// encodedParams[k] = &bytes
// }
err := wsc.WriteJSON(types.RPCRequest{ err := wsc.WriteJSON(types.RPCRequest{
JSONRPC: "2.0", JSONRPC: "2.0",
Method: method, Method: method,
Params: encodedParams,
Params: params,
ID: "", ID: "",
}) })
return err return err


+ 78
- 41
rpc/lib/rpc_test.go View File

@ -3,6 +3,7 @@ package rpc
import ( import (
"bytes" "bytes"
crand "crypto/rand" crand "crypto/rand"
"encoding/json"
"fmt" "fmt"
"math/rand" "math/rand"
"net/http" "net/http"
@ -12,7 +13,7 @@ import (
"github.com/stretchr/testify/assert" "github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require" "github.com/stretchr/testify/require"
wire "github.com/tendermint/go-wire"
"github.com/tendermint/go-wire/data"
client "github.com/tendermint/tendermint/rpc/lib/client" client "github.com/tendermint/tendermint/rpc/lib/client"
server "github.com/tendermint/tendermint/rpc/lib/server" server "github.com/tendermint/tendermint/rpc/lib/server"
types "github.com/tendermint/tendermint/rpc/lib/types" types "github.com/tendermint/tendermint/rpc/lib/types"
@ -28,42 +29,51 @@ const (
websocketEndpoint = "/websocket/endpoint" websocketEndpoint = "/websocket/endpoint"
) )
// Define a type for results and register concrete versions
type Result interface{}
type ResultEcho struct { type ResultEcho struct {
Value string
Value string `json:"value"`
}
type ResultEchoInt struct {
Value int `json:"value"`
} }
type ResultEchoBytes struct { type ResultEchoBytes struct {
Value []byte
Value []byte `json:"value"`
} }
var _ = wire.RegisterInterface(
struct{ Result }{},
wire.ConcreteType{&ResultEcho{}, 0x1},
wire.ConcreteType{&ResultEchoBytes{}, 0x2},
)
type ResultEchoDataBytes struct {
Value data.Bytes `json:"value"`
}
// Define some routes // Define some routes
var Routes = map[string]*server.RPCFunc{ var Routes = map[string]*server.RPCFunc{
"echo": server.NewRPCFunc(EchoResult, "arg"),
"echo_ws": server.NewWSRPCFunc(EchoWSResult, "arg"),
"echo_bytes": server.NewRPCFunc(EchoBytesResult, "arg"),
"echo": server.NewRPCFunc(EchoResult, "arg"),
"echo_ws": server.NewWSRPCFunc(EchoWSResult, "arg"),
"echo_bytes": server.NewRPCFunc(EchoBytesResult, "arg"),
"echo_data_bytes": server.NewRPCFunc(EchoDataBytesResult, "arg"),
"echo_int": server.NewRPCFunc(EchoIntResult, "arg"),
} }
func EchoResult(v string) (Result, error) {
func EchoResult(v string) (*ResultEcho, error) {
return &ResultEcho{v}, nil return &ResultEcho{v}, nil
} }
func EchoWSResult(wsCtx types.WSRPCContext, v string) (Result, error) {
func EchoWSResult(wsCtx types.WSRPCContext, v string) (*ResultEcho, error) {
return &ResultEcho{v}, nil return &ResultEcho{v}, nil
} }
func EchoBytesResult(v []byte) (Result, error) {
func EchoIntResult(v int) (*ResultEchoInt, error) {
return &ResultEchoInt{v}, nil
}
func EchoBytesResult(v []byte) (*ResultEchoBytes, error) {
return &ResultEchoBytes{v}, nil return &ResultEchoBytes{v}, nil
} }
func EchoDataBytesResult(v data.Bytes) (*ResultEchoDataBytes, error) {
return &ResultEchoDataBytes{v}, nil
}
// launch unix and tcp servers // launch unix and tcp servers
func init() { func init() {
cmd := exec.Command("rm", "-f", unixSocket) cmd := exec.Command("rm", "-f", unixSocket)
@ -105,22 +115,44 @@ func echoViaHTTP(cl client.HTTPClient, val string) (string, error) {
params := map[string]interface{}{ params := map[string]interface{}{
"arg": val, "arg": val,
} }
var result Result
if _, err := cl.Call("echo", params, &result); err != nil {
result := new(ResultEcho)
if _, err := cl.Call("echo", params, result); err != nil {
return "", err return "", err
} }
return result.(*ResultEcho).Value, nil
return result.Value, nil
}
func echoIntViaHTTP(cl client.HTTPClient, val int) (int, error) {
params := map[string]interface{}{
"arg": val,
}
result := new(ResultEchoInt)
if _, err := cl.Call("echo_int", params, result); err != nil {
return 0, err
}
return result.Value, nil
} }
func echoBytesViaHTTP(cl client.HTTPClient, bytes []byte) ([]byte, error) { func echoBytesViaHTTP(cl client.HTTPClient, bytes []byte) ([]byte, error) {
params := map[string]interface{}{ params := map[string]interface{}{
"arg": bytes, "arg": bytes,
} }
var result Result
if _, err := cl.Call("echo_bytes", params, &result); err != nil {
result := new(ResultEchoBytes)
if _, err := cl.Call("echo_bytes", params, result); err != nil {
return []byte{}, err
}
return result.Value, nil
}
func echoDataBytesViaHTTP(cl client.HTTPClient, bytes data.Bytes) (data.Bytes, error) {
params := map[string]interface{}{
"arg": bytes,
}
result := new(ResultEchoDataBytes)
if _, err := cl.Call("echo_data_bytes", params, result); err != nil {
return []byte{}, err return []byte{}, err
} }
return result.(*ResultEchoBytes).Value, nil
return result.Value, nil
} }
func testWithHTTPClient(t *testing.T, cl client.HTTPClient) { func testWithHTTPClient(t *testing.T, cl client.HTTPClient) {
@ -133,6 +165,16 @@ func testWithHTTPClient(t *testing.T, cl client.HTTPClient) {
got2, err := echoBytesViaHTTP(cl, val2) got2, err := echoBytesViaHTTP(cl, val2)
require.Nil(t, err) require.Nil(t, err)
assert.Equal(t, got2, val2) assert.Equal(t, got2, val2)
val3 := data.Bytes(randBytes(t))
got3, err := echoDataBytesViaHTTP(cl, val3)
require.Nil(t, err)
assert.Equal(t, got3, val3)
val4 := rand.Intn(10000)
got4, err := echoIntViaHTTP(cl, val4)
require.Nil(t, err)
assert.Equal(t, got4, val4)
} }
func echoViaWS(cl *client.WSClient, val string) (string, error) { func echoViaWS(cl *client.WSClient, val string) (string, error) {
@ -146,12 +188,12 @@ func echoViaWS(cl *client.WSClient, val string) (string, error) {
select { select {
case msg := <-cl.ResultsCh: case msg := <-cl.ResultsCh:
result := new(Result)
wire.ReadJSONPtr(result, msg, &err)
result := new(ResultEcho)
err = json.Unmarshal(msg, result)
if err != nil { if err != nil {
return "", nil return "", nil
} }
return (*result).(*ResultEcho).Value, nil
return result.Value, nil
case err := <-cl.ErrorsCh: case err := <-cl.ErrorsCh:
return "", err return "", err
} }
@ -168,12 +210,12 @@ func echoBytesViaWS(cl *client.WSClient, bytes []byte) ([]byte, error) {
select { select {
case msg := <-cl.ResultsCh: case msg := <-cl.ResultsCh:
result := new(Result)
wire.ReadJSONPtr(result, msg, &err)
result := new(ResultEchoBytes)
err = json.Unmarshal(msg, result)
if err != nil { if err != nil {
return []byte{}, nil return []byte{}, nil
} }
return (*result).(*ResultEchoBytes).Value, nil
return result.Value, nil
case err := <-cl.ErrorsCh: case err := <-cl.ErrorsCh:
return []byte{}, err return []byte{}, err
} }
@ -241,20 +283,15 @@ func TestWSNewWSRPCFunc(t *testing.T) {
params := map[string]interface{}{ params := map[string]interface{}{
"arg": val, "arg": val,
} }
err = cl.WriteJSON(types.RPCRequest{
JSONRPC: "2.0",
ID: "",
Method: "echo_ws",
Params: params,
})
err = cl.Call("echo_ws", params)
require.Nil(t, err) require.Nil(t, err)
select { select {
case msg := <-cl.ResultsCh: case msg := <-cl.ResultsCh:
result := new(Result)
wire.ReadJSONPtr(result, msg, &err)
result := new(ResultEcho)
err = json.Unmarshal(msg, result)
require.Nil(t, err) require.Nil(t, err)
got := (*result).(*ResultEcho).Value
got := result.Value
assert.Equal(t, got, val) assert.Equal(t, got, val)
case err := <-cl.ErrorsCh: case err := <-cl.ErrorsCh:
t.Fatal(err) t.Fatal(err)
@ -279,10 +316,10 @@ func TestWSHandlesArrayParams(t *testing.T) {
select { select {
case msg := <-cl.ResultsCh: case msg := <-cl.ResultsCh:
result := new(Result)
wire.ReadJSONPtr(result, msg, &err)
result := new(ResultEcho)
err = json.Unmarshal(msg, result)
require.Nil(t, err) require.Nil(t, err)
got := (*result).(*ResultEcho).Value
got := result.Value
assert.Equal(t, got, val) assert.Equal(t, got, val)
case err := <-cl.ErrorsCh: case err := <-cl.ErrorsCh:
t.Fatalf("%+v", err) t.Fatalf("%+v", err)


+ 4
- 6
rpc/lib/server/handlers.go View File

@ -14,7 +14,7 @@ import (
"github.com/gorilla/websocket" "github.com/gorilla/websocket"
"github.com/pkg/errors" "github.com/pkg/errors"
wire "github.com/tendermint/go-wire"
//wire "github.com/tendermint/go-wire"
types "github.com/tendermint/tendermint/rpc/lib/types" types "github.com/tendermint/tendermint/rpc/lib/types"
cmn "github.com/tendermint/tmlibs/common" cmn "github.com/tendermint/tmlibs/common"
events "github.com/tendermint/tmlibs/events" events "github.com/tendermint/tmlibs/events"
@ -204,7 +204,7 @@ func jsonParamsToArgsWS(rpcFunc *RPCFunc, paramsI interface{}, wsCtx types.WSRPC
func _jsonObjectToArg(ty reflect.Type, object interface{}) (reflect.Value, error) { func _jsonObjectToArg(ty reflect.Type, object interface{}) (reflect.Value, error) {
var err error var err error
v := reflect.New(ty) v := reflect.New(ty)
wire.ReadJSONObjectPtr(v.Interface(), object, &err)
readJSONObjectPtr(v.Interface(), object, &err)
if err != nil { if err != nil {
return v, err return v, err
} }
@ -280,9 +280,8 @@ func httpParamsToArgs(rpcFunc *RPCFunc, r *http.Request) ([]reflect.Value, error
} }
func _jsonStringToArg(ty reflect.Type, arg string) (reflect.Value, error) { func _jsonStringToArg(ty reflect.Type, arg string) (reflect.Value, error) {
var err error
v := reflect.New(ty) v := reflect.New(ty)
wire.ReadJSONPtr(v.Interface(), []byte(arg), &err)
err := json.Unmarshal([]byte(arg), v.Interface())
if err != nil { if err != nil {
return v, err return v, err
} }
@ -315,9 +314,8 @@ func nonJsonToArg(ty reflect.Type, arg string) (reflect.Value, error, bool) {
} }
if isQuotedString && expectingByteSlice { if isQuotedString && expectingByteSlice {
var err error
v := reflect.New(reflect.TypeOf("")) v := reflect.New(reflect.TypeOf(""))
wire.ReadJSONPtr(v.Interface(), []byte(arg), &err)
err := json.Unmarshal([]byte(arg), v.Interface())
if err != nil { if err != nil {
return reflect.ValueOf(nil), err, false return reflect.ValueOf(nil), err, false
} }


+ 133
- 0
rpc/lib/server/parse_test.go View File

@ -0,0 +1,133 @@
package rpcserver
import (
"encoding/json"
"testing"
"github.com/stretchr/testify/assert"
"github.com/tendermint/go-wire/data"
)
func TestParseJSONMap(t *testing.T) {
assert := assert.New(t)
input := []byte(`{"value":"1234","height":22}`)
// naive is float,string
var p1 map[string]interface{}
err := json.Unmarshal(input, &p1)
if assert.Nil(err) {
h, ok := p1["height"].(float64)
if assert.True(ok, "%#v", p1["height"]) {
assert.EqualValues(22, h)
}
v, ok := p1["value"].(string)
if assert.True(ok, "%#v", p1["value"]) {
assert.EqualValues("1234", v)
}
}
// preloading map with values doesn't help
tmp := 0
p2 := map[string]interface{}{
"value": &data.Bytes{},
"height": &tmp,
}
err = json.Unmarshal(input, &p2)
if assert.Nil(err) {
h, ok := p2["height"].(float64)
if assert.True(ok, "%#v", p2["height"]) {
assert.EqualValues(22, h)
}
v, ok := p2["value"].(string)
if assert.True(ok, "%#v", p2["value"]) {
assert.EqualValues("1234", v)
}
}
// preload here with *pointers* to the desired types
// struct has unknown types, but hard-coded keys
tmp = 0
p3 := struct {
Value interface{} `json:"value"`
Height interface{} `json:"height"`
}{
Height: &tmp,
Value: &data.Bytes{},
}
err = json.Unmarshal(input, &p3)
if assert.Nil(err) {
h, ok := p3.Height.(*int)
if assert.True(ok, "%#v", p3.Height) {
assert.Equal(22, *h)
}
v, ok := p3.Value.(*data.Bytes)
if assert.True(ok, "%#v", p3.Value) {
assert.EqualValues([]byte{0x12, 0x34}, *v)
}
}
// simplest solution, but hard-coded
p4 := struct {
Value data.Bytes `json:"value"`
Height int `json:"height"`
}{}
err = json.Unmarshal(input, &p4)
if assert.Nil(err) {
assert.EqualValues(22, p4.Height)
assert.EqualValues([]byte{0x12, 0x34}, p4.Value)
}
// so, let's use this trick...
// dynamic keys on map, and we can deserialize to the desired types
var p5 map[string]*json.RawMessage
err = json.Unmarshal(input, &p5)
if assert.Nil(err) {
var h int
err = json.Unmarshal(*p5["height"], &h)
if assert.Nil(err) {
assert.Equal(22, h)
}
var v data.Bytes
err = json.Unmarshal(*p5["value"], &v)
if assert.Nil(err) {
assert.Equal(data.Bytes{0x12, 0x34}, v)
}
}
}
func TestParseJSONArray(t *testing.T) {
assert := assert.New(t)
input := []byte(`["1234",22]`)
// naive is float,string
var p1 []interface{}
err := json.Unmarshal(input, &p1)
if assert.Nil(err) {
v, ok := p1[0].(string)
if assert.True(ok, "%#v", p1[0]) {
assert.EqualValues("1234", v)
}
h, ok := p1[1].(float64)
if assert.True(ok, "%#v", p1[1]) {
assert.EqualValues(22, h)
}
}
// preloading map with values helps here (unlike map - p2 above)
tmp := 0
p2 := []interface{}{&data.Bytes{}, &tmp}
err = json.Unmarshal(input, &p2)
if assert.Nil(err) {
v, ok := p2[0].(*data.Bytes)
if assert.True(ok, "%#v", p2[0]) {
assert.EqualValues([]byte{0x12, 0x34}, *v)
}
h, ok := p2[1].(*int)
if assert.True(ok, "%#v", p2[1]) {
assert.EqualValues(22, *h)
}
}
}

+ 290
- 0
rpc/lib/server/wire.go View File

@ -0,0 +1,290 @@
package rpcserver
import (
"encoding/base64"
"encoding/hex"
"reflect"
"time"
"github.com/pkg/errors"
"github.com/tendermint/go-wire"
"github.com/tendermint/go-wire/data"
cmn "github.com/tendermint/tmlibs/common"
)
var (
timeType = wire.GetTypeFromStructDeclaration(struct{ time.Time }{})
)
func readJSONObjectPtr(o interface{}, object interface{}, err *error) interface{} {
rv, rt := reflect.ValueOf(o), reflect.TypeOf(o)
if rv.Kind() == reflect.Ptr {
readReflectJSON(rv.Elem(), rt.Elem(), wire.Options{}, object, err)
} else {
cmn.PanicSanity("ReadJSON(Object)Ptr expects o to be a pointer")
}
return o
}
func readByteJSON(o interface{}) (typeByte byte, rest interface{}, err error) {
oSlice, ok := o.([]interface{})
if !ok {
err = errors.New(cmn.Fmt("Expected type [Byte,?] but got type %v", reflect.TypeOf(o)))
return
}
if len(oSlice) != 2 {
err = errors.New(cmn.Fmt("Expected [Byte,?] len 2 but got len %v", len(oSlice)))
return
}
typeByte_, ok := oSlice[0].(float64)
typeByte = byte(typeByte_)
rest = oSlice[1]
return
}
// Contract: Caller must ensure that rt is supported
// (e.g. is recursively composed of supported native types, and structs and slices.)
// rv and rt refer to the object we're unmarhsaling into, whereas o is the result of naiive json unmarshal (map[string]interface{})
func readReflectJSON(rv reflect.Value, rt reflect.Type, opts wire.Options, o interface{}, err *error) {
// Get typeInfo
typeInfo := wire.GetTypeInfo(rt)
if rt.Kind() == reflect.Interface {
if !typeInfo.IsRegisteredInterface {
// There's no way we can read such a thing.
*err = errors.New(cmn.Fmt("Cannot read unregistered interface type %v", rt))
return
}
if o == nil {
return // nil
}
typeByte, rest, err_ := readByteJSON(o)
if err_ != nil {
*err = err_
return
}
crt, ok := typeInfo.ByteToType[typeByte]
if !ok {
*err = errors.New(cmn.Fmt("Byte %X not registered for interface %v", typeByte, rt))
return
}
if crt.Kind() == reflect.Ptr {
crt = crt.Elem()
crv := reflect.New(crt)
readReflectJSON(crv.Elem(), crt, opts, rest, err)
rv.Set(crv) // NOTE: orig rv is ignored.
} else {
crv := reflect.New(crt).Elem()
readReflectJSON(crv, crt, opts, rest, err)
rv.Set(crv) // NOTE: orig rv is ignored.
}
return
}
if rt.Kind() == reflect.Ptr {
if o == nil {
return // nil
}
// Create new struct if rv is nil.
if rv.IsNil() {
newRv := reflect.New(rt.Elem())
rv.Set(newRv)
rv = newRv
}
// Dereference pointer
rv, rt = rv.Elem(), rt.Elem()
typeInfo = wire.GetTypeInfo(rt)
// continue...
}
switch rt.Kind() {
case reflect.Array:
elemRt := rt.Elem()
length := rt.Len()
if elemRt.Kind() == reflect.Uint8 {
// Special case: Bytearrays
oString, ok := o.(string)
if !ok {
*err = errors.New(cmn.Fmt("Expected string but got type %v", reflect.TypeOf(o)))
return
}
// if its data.Bytes, use hex; else use base64
dbty := reflect.TypeOf(data.Bytes{})
var buf []byte
var err_ error
if rt == dbty {
buf, err_ = hex.DecodeString(oString)
} else {
buf, err_ = base64.StdEncoding.DecodeString(oString)
}
if err_ != nil {
*err = err_
return
}
if len(buf) != length {
*err = errors.New(cmn.Fmt("Expected bytearray of length %v but got %v", length, len(buf)))
return
}
//log.Info("Read bytearray", "bytes", buf)
reflect.Copy(rv, reflect.ValueOf(buf))
} else {
oSlice, ok := o.([]interface{})
if !ok {
*err = errors.New(cmn.Fmt("Expected array of %v but got type %v", rt, reflect.TypeOf(o)))
return
}
if len(oSlice) != length {
*err = errors.New(cmn.Fmt("Expected array of length %v but got %v", length, len(oSlice)))
return
}
for i := 0; i < length; i++ {
elemRv := rv.Index(i)
readReflectJSON(elemRv, elemRt, opts, oSlice[i], err)
}
//log.Info("Read x-array", "x", elemRt, "length", length)
}
case reflect.Slice:
elemRt := rt.Elem()
if elemRt.Kind() == reflect.Uint8 {
// Special case: Byteslices
oString, ok := o.(string)
if !ok {
*err = errors.New(cmn.Fmt("Expected string but got type %v", reflect.TypeOf(o)))
return
}
// if its data.Bytes, use hex; else use base64
dbty := reflect.TypeOf(data.Bytes{})
var buf []byte
var err_ error
if rt == dbty {
buf, err_ = hex.DecodeString(oString)
} else {
buf, err_ = base64.StdEncoding.DecodeString(oString)
}
if err_ != nil {
*err = err_
return
}
//log.Info("Read byteslice", "bytes", byteslice)
rv.Set(reflect.ValueOf(buf))
} else {
// Read length
oSlice, ok := o.([]interface{})
if !ok {
*err = errors.New(cmn.Fmt("Expected array of %v but got type %v", rt, reflect.TypeOf(o)))
return
}
length := len(oSlice)
//log.Info("Read slice", "length", length)
sliceRv := reflect.MakeSlice(rt, length, length)
// Read elems
for i := 0; i < length; i++ {
elemRv := sliceRv.Index(i)
readReflectJSON(elemRv, elemRt, opts, oSlice[i], err)
}
rv.Set(sliceRv)
}
case reflect.Struct:
if rt == timeType {
// Special case: time.Time
str, ok := o.(string)
if !ok {
*err = errors.New(cmn.Fmt("Expected string but got type %v", reflect.TypeOf(o)))
return
}
// try three ways, seconds, milliseconds, or microseconds...
t, err_ := time.Parse(time.RFC3339Nano, str)
if err_ != nil {
*err = err_
return
}
rv.Set(reflect.ValueOf(t))
} else {
if typeInfo.Unwrap {
f := typeInfo.Fields[0]
fieldIdx, fieldType, opts := f.Index, f.Type, f.Options
fieldRv := rv.Field(fieldIdx)
readReflectJSON(fieldRv, fieldType, opts, o, err)
} else {
oMap, ok := o.(map[string]interface{})
if !ok {
*err = errors.New(cmn.Fmt("Expected map but got type %v", reflect.TypeOf(o)))
return
}
// TODO: ensure that all fields are set?
// TODO: disallow unknown oMap fields?
for _, fieldInfo := range typeInfo.Fields {
f := fieldInfo
fieldIdx, fieldType, opts := f.Index, f.Type, f.Options
value, ok := oMap[opts.JSONName]
if !ok {
continue // Skip missing fields.
}
fieldRv := rv.Field(fieldIdx)
readReflectJSON(fieldRv, fieldType, opts, value, err)
}
}
}
case reflect.String:
str, ok := o.(string)
if !ok {
*err = errors.New(cmn.Fmt("Expected string but got type %v", reflect.TypeOf(o)))
return
}
//log.Info("Read string", "str", str)
rv.SetString(str)
case reflect.Int64, reflect.Int32, reflect.Int16, reflect.Int8, reflect.Int:
num, ok := o.(float64)
if !ok {
*err = errors.New(cmn.Fmt("Expected numeric but got type %v", reflect.TypeOf(o)))
return
}
//log.Info("Read num", "num", num)
rv.SetInt(int64(num))
case reflect.Uint64, reflect.Uint32, reflect.Uint16, reflect.Uint8, reflect.Uint:
num, ok := o.(float64)
if !ok {
*err = errors.New(cmn.Fmt("Expected numeric but got type %v", reflect.TypeOf(o)))
return
}
if num < 0 {
*err = errors.New(cmn.Fmt("Expected unsigned numeric but got %v", num))
return
}
//log.Info("Read num", "num", num)
rv.SetUint(uint64(num))
case reflect.Float64, reflect.Float32:
if !opts.Unsafe {
*err = errors.New("Wire float* support requires `wire:\"unsafe\"`")
return
}
num, ok := o.(float64)
if !ok {
*err = errors.New(cmn.Fmt("Expected numeric but got type %v", reflect.TypeOf(o)))
return
}
//log.Info("Read num", "num", num)
rv.SetFloat(num)
case reflect.Bool:
bl, ok := o.(bool)
if !ok {
*err = errors.New(cmn.Fmt("Expected boolean but got type %v", reflect.TypeOf(o)))
return
}
//log.Info("Read boolean", "boolean", bl)
rv.SetBool(bl)
default:
cmn.PanicSanity(cmn.Fmt("Unknown field type %v", rt.Kind()))
}
}

+ 8
- 3
rpc/lib/types/types.go View File

@ -4,7 +4,6 @@ import (
"encoding/json" "encoding/json"
"strings" "strings"
wire "github.com/tendermint/go-wire"
events "github.com/tendermint/tmlibs/events" events "github.com/tendermint/tmlibs/events"
) )
@ -52,8 +51,14 @@ type RPCResponse struct {
func NewRPCResponse(id string, res interface{}, err string) RPCResponse { func NewRPCResponse(id string, res interface{}, err string) RPCResponse {
var raw *json.RawMessage var raw *json.RawMessage
if res != nil { if res != nil {
rawMsg := json.RawMessage(wire.JSONBytes(res))
raw = &rawMsg
var js []byte
js, err2 := json.Marshal(res)
if err2 == nil {
rawMsg := json.RawMessage(js)
raw = &rawMsg
} else {
err = err2.Error()
}
} }
return RPCResponse{ return RPCResponse{
JSONRPC: "2.0", JSONRPC: "2.0",


+ 39
- 45
rpc/test/client_test.go View File

@ -12,6 +12,7 @@ import (
"github.com/stretchr/testify/require" "github.com/stretchr/testify/require"
abci "github.com/tendermint/abci/types" abci "github.com/tendermint/abci/types"
"github.com/tendermint/go-wire/data"
"github.com/tendermint/tendermint/rpc/core" "github.com/tendermint/tendermint/rpc/core"
ctypes "github.com/tendermint/tendermint/rpc/core/types" ctypes "github.com/tendermint/tendermint/rpc/core/types"
rpc "github.com/tendermint/tendermint/rpc/lib/client" rpc "github.com/tendermint/tendermint/rpc/lib/client"
@ -38,12 +39,11 @@ func TestJSONStatus(t *testing.T) {
func testStatus(t *testing.T, client rpc.HTTPClient) { func testStatus(t *testing.T, client rpc.HTTPClient) {
chainID := GetConfig().GetString("chain_id") chainID := GetConfig().GetString("chain_id")
tmResult := new(ctypes.TMResult)
_, err := client.Call("status", map[string]interface{}{}, tmResult)
result := new(ctypes.ResultStatus)
_, err := client.Call("status", map[string]interface{}{}, result)
require.Nil(t, err) require.Nil(t, err)
status := tmResult.Unwrap().(*ctypes.ResultStatus)
assert.Equal(t, chainID, status.NodeInfo.Network)
assert.Equal(t, chainID, result.NodeInfo.Network)
} }
//-------------------------------------------------------------------------------- //--------------------------------------------------------------------------------
@ -69,13 +69,12 @@ func TestJSONBroadcastTxSync(t *testing.T) {
func testBroadcastTxSync(t *testing.T, client rpc.HTTPClient) { func testBroadcastTxSync(t *testing.T, client rpc.HTTPClient) {
mem := node.MempoolReactor().Mempool mem := node.MempoolReactor().Mempool
initMemSize := mem.Size() initMemSize := mem.Size()
tmResult := new(ctypes.TMResult)
result := new(ctypes.ResultBroadcastTx)
tx := randBytes(t) tx := randBytes(t)
_, err := client.Call("broadcast_tx_sync", map[string]interface{}{"tx": tx}, tmResult)
_, err := client.Call("broadcast_tx_sync", map[string]interface{}{"tx": tx}, result)
require.Nil(t, err) require.Nil(t, err)
res := tmResult.Unwrap().(*ctypes.ResultBroadcastTx)
require.Equal(t, abci.CodeType_OK, res.Code)
require.Equal(t, abci.CodeType_OK, result.Code)
require.Equal(t, initMemSize+1, mem.Size()) require.Equal(t, initMemSize+1, mem.Size())
txs := mem.Reap(1) txs := mem.Reap(1)
require.EqualValues(t, tx, txs[0]) require.EqualValues(t, tx, txs[0])
@ -92,14 +91,13 @@ func testTxKV(t *testing.T) ([]byte, []byte, types.Tx) {
} }
func sendTx(t *testing.T, client rpc.HTTPClient) ([]byte, []byte) { func sendTx(t *testing.T, client rpc.HTTPClient) ([]byte, []byte) {
tmResult := new(ctypes.TMResult)
result := new(ctypes.ResultBroadcastTxCommit)
k, v, tx := testTxKV(t) k, v, tx := testTxKV(t)
_, err := client.Call("broadcast_tx_commit", map[string]interface{}{"tx": tx}, tmResult)
_, err := client.Call("broadcast_tx_commit", map[string]interface{}{"tx": tx}, result)
require.Nil(t, err) require.Nil(t, err)
bres := tmResult.Unwrap().(*ctypes.ResultBroadcastTxCommit)
require.NotNil(t, 0, bres.DeliverTx, "%#v", bres)
require.EqualValues(t, 0, bres.CheckTx.Code, "%#v", bres)
require.EqualValues(t, 0, bres.DeliverTx.Code, "%#v", bres)
require.NotNil(t, 0, result.DeliverTx, "%#v", result)
require.EqualValues(t, 0, result.CheckTx.Code, "%#v", result)
require.EqualValues(t, 0, result.DeliverTx.Code, "%#v", result)
return k, v return k, v
} }
@ -114,16 +112,15 @@ func TestJSONABCIQuery(t *testing.T) {
func testABCIQuery(t *testing.T, client rpc.HTTPClient) { func testABCIQuery(t *testing.T, client rpc.HTTPClient) {
k, _ := sendTx(t, client) k, _ := sendTx(t, client)
time.Sleep(time.Millisecond * 500) time.Sleep(time.Millisecond * 500)
tmResult := new(ctypes.TMResult)
result := new(ctypes.ResultABCIQuery)
_, err := client.Call("abci_query", _, err := client.Call("abci_query",
map[string]interface{}{"path": "", "data": k, "prove": false}, tmResult)
map[string]interface{}{"path": "", "data": data.Bytes(k), "prove": false}, result)
require.Nil(t, err) require.Nil(t, err)
resQuery := tmResult.Unwrap().(*ctypes.ResultABCIQuery)
require.EqualValues(t, 0, resQuery.Code)
require.EqualValues(t, 0, result.Code)
// XXX: specific to value returned by the dummy // XXX: specific to value returned by the dummy
require.NotEqual(t, 0, len(resQuery.Value))
require.NotEqual(t, 0, len(result.Value))
} }
//-------------------------------------------------------------------------------- //--------------------------------------------------------------------------------
@ -140,15 +137,14 @@ func TestJSONBroadcastTxCommit(t *testing.T) {
func testBroadcastTxCommit(t *testing.T, client rpc.HTTPClient) { func testBroadcastTxCommit(t *testing.T, client rpc.HTTPClient) {
require := require.New(t) require := require.New(t)
tmResult := new(ctypes.TMResult)
result := new(ctypes.ResultBroadcastTxCommit)
tx := randBytes(t) tx := randBytes(t)
_, err := client.Call("broadcast_tx_commit", map[string]interface{}{"tx": tx}, tmResult)
_, err := client.Call("broadcast_tx_commit", map[string]interface{}{"tx": tx}, result)
require.Nil(err) require.Nil(err)
res := tmResult.Unwrap().(*ctypes.ResultBroadcastTxCommit)
checkTx := res.CheckTx
checkTx := result.CheckTx
require.Equal(abci.CodeType_OK, checkTx.Code) require.Equal(abci.CodeType_OK, checkTx.Code)
deliverTx := res.DeliverTx
deliverTx := result.DeliverTx
require.Equal(abci.CodeType_OK, deliverTx.Code) require.Equal(abci.CodeType_OK, deliverTx.Code)
mem := node.MempoolReactor().Mempool mem := node.MempoolReactor().Mempool
require.Equal(0, mem.Size()) require.Equal(0, mem.Size())
@ -178,16 +174,15 @@ func testTx(t *testing.T, client rpc.HTTPClient, withIndexer bool) {
assert, require := assert.New(t), require.New(t) assert, require := assert.New(t), require.New(t)
// first we broadcast a tx // first we broadcast a tx
tmResult := new(ctypes.TMResult)
result := new(ctypes.ResultBroadcastTxCommit)
txBytes := randBytes(t) txBytes := randBytes(t)
tx := types.Tx(txBytes) tx := types.Tx(txBytes)
_, err := client.Call("broadcast_tx_commit", map[string]interface{}{"tx": txBytes}, tmResult)
_, err := client.Call("broadcast_tx_commit", map[string]interface{}{"tx": txBytes}, result)
require.Nil(err) require.Nil(err)
res := tmResult.Unwrap().(*ctypes.ResultBroadcastTxCommit)
checkTx := res.CheckTx
checkTx := result.CheckTx
require.Equal(abci.CodeType_OK, checkTx.Code) require.Equal(abci.CodeType_OK, checkTx.Code)
deliverTx := res.DeliverTx
deliverTx := result.DeliverTx
require.Equal(abci.CodeType_OK, deliverTx.Code) require.Equal(abci.CodeType_OK, deliverTx.Code)
mem := node.MempoolReactor().Mempool mem := node.MempoolReactor().Mempool
require.Equal(0, mem.Size()) require.Equal(0, mem.Size())
@ -214,24 +209,23 @@ func testTx(t *testing.T, client rpc.HTTPClient, withIndexer bool) {
// now we query for the tx. // now we query for the tx.
// since there's only one tx, we know index=0. // since there's only one tx, we know index=0.
tmResult = new(ctypes.TMResult)
result2 := new(ctypes.ResultTx)
query := map[string]interface{}{ query := map[string]interface{}{
"hash": tc.hash, "hash": tc.hash,
"prove": tc.prove, "prove": tc.prove,
} }
_, err = client.Call("tx", query, tmResult)
_, err = client.Call("tx", query, result2)
valid := (withIndexer && tc.valid) valid := (withIndexer && tc.valid)
if !valid { if !valid {
require.NotNil(err, idx) require.NotNil(err, idx)
} else { } else {
require.Nil(err, idx) require.Nil(err, idx)
res2 := tmResult.Unwrap().(*ctypes.ResultTx)
assert.Equal(tx, res2.Tx, idx)
assert.Equal(res.Height, res2.Height, idx)
assert.Equal(0, res2.Index, idx)
assert.Equal(abci.CodeType_OK, res2.TxResult.Code, idx)
assert.Equal(tx, result2.Tx, idx)
assert.Equal(result.Height, result2.Height, idx)
assert.Equal(0, result2.Index, idx)
assert.Equal(abci.CodeType_OK, result2.TxResult.Code, idx)
// time to verify the proof // time to verify the proof
proof := res2.Proof
proof := result2.Proof
if tc.prove && assert.Equal(tx, proof.Data, idx) { if tc.prove && assert.Equal(tx, proof.Data, idx) {
assert.True(proof.Proof.Verify(proof.Index, proof.Total, tx.Hash(), proof.RootHash), idx) assert.True(proof.Proof.Verify(proof.Index, proof.Total, tx.Hash(), proof.RootHash), idx)
} }
@ -286,7 +280,7 @@ func TestWSBlockchainGrowth(t *testing.T) {
var initBlockN int var initBlockN int
for i := 0; i < 3; i++ { for i := 0; i < 3; i++ {
waitForEvent(t, wsc, eid, true, func() {}, func(eid string, eventData interface{}) error { waitForEvent(t, wsc, eid, true, func() {}, func(eid string, eventData interface{}) error {
block := eventData.(types.EventDataNewBlock).Block
block := eventData.(types.TMEventData).Unwrap().(types.EventDataNewBlock).Block
if i == 0 { if i == 0 {
initBlockN = block.Header.Height initBlockN = block.Header.Height
} else { } else {
@ -315,12 +309,12 @@ func TestWSTxEvent(t *testing.T) {
}() }()
// send an tx // send an tx
tmResult := new(ctypes.TMResult)
_, err := GetJSONClient().Call("broadcast_tx_sync", map[string]interface{}{"tx": tx}, tmResult)
result := new(ctypes.ResultBroadcastTx)
_, err := GetJSONClient().Call("broadcast_tx_sync", map[string]interface{}{"tx": tx}, result)
require.Nil(err) require.Nil(err)
waitForEvent(t, wsc, eid, true, func() {}, func(eid string, b interface{}) error { waitForEvent(t, wsc, eid, true, func() {}, func(eid string, b interface{}) error {
evt, ok := b.(types.EventDataTx)
evt, ok := b.(types.TMEventData).Unwrap().(types.EventDataTx)
require.True(ok, "Got wrong event type: %#v", b) require.True(ok, "Got wrong event type: %#v", b)
require.Equal(tx, []byte(evt.Tx), "Returned different tx") require.Equal(tx, []byte(evt.Tx), "Returned different tx")
require.Equal(abci.CodeType_OK, evt.Code) require.Equal(abci.CodeType_OK, evt.Code)
@ -373,12 +367,12 @@ func TestWSDoubleFire(t *testing.T) {
// //
//func TestURIUnsafeSetConfig(t *testing.T) { //func TestURIUnsafeSetConfig(t *testing.T) {
// for _, testCase := range testCasesUnsafeSetConfig { // for _, testCase := range testCasesUnsafeSetConfig {
// tmResult := new(ctypes.TMResult)
// result := new(ctypes.TMResult)
// _, err := GetURIClient().Call("unsafe_set_config", map[string]interface{}{ // _, err := GetURIClient().Call("unsafe_set_config", map[string]interface{}{
// "type": testCase[0], // "type": testCase[0],
// "key": testCase[1], // "key": testCase[1],
// "value": testCase[2], // "value": testCase[2],
// }, tmResult)
// }, result)
// require.Nil(t, err) // require.Nil(t, err)
// } // }
// testUnsafeSetConfig(t) // testUnsafeSetConfig(t)
@ -386,10 +380,10 @@ func TestWSDoubleFire(t *testing.T) {
// //
//func TestJSONUnsafeSetConfig(t *testing.T) { //func TestJSONUnsafeSetConfig(t *testing.T) {
// for _, testCase := range testCasesUnsafeSetConfig { // for _, testCase := range testCasesUnsafeSetConfig {
// tmResult := new(ctypes.TMResult)
// result := new(ctypes.TMResult)
// _, err := GetJSONClient().Call("unsafe_set_config", // _, err := GetJSONClient().Call("unsafe_set_config",
// map[string]interface{}{"type": testCase[0], "key": testCase[1], "value": testCase[2]}, // map[string]interface{}{"type": testCase[0], "key": testCase[1], "value": testCase[2]},
// tmResult)
// result)
// require.Nil(t, err) // require.Nil(t, err)
// } // }
// testUnsafeSetConfig(t) // testUnsafeSetConfig(t)


+ 9
- 8
rpc/test/helpers.go View File

@ -1,6 +1,7 @@
package rpctest package rpctest
import ( import (
"encoding/json"
"fmt" "fmt"
"math/rand" "math/rand"
"os" "os"
@ -11,7 +12,6 @@ import (
"github.com/spf13/viper" "github.com/spf13/viper"
"github.com/stretchr/testify/require" "github.com/stretchr/testify/require"
wire "github.com/tendermint/go-wire"
logger "github.com/tendermint/tmlibs/logger" logger "github.com/tendermint/tmlibs/logger"
abci "github.com/tendermint/abci/types" abci "github.com/tendermint/abci/types"
@ -131,15 +131,16 @@ func waitForEvent(t *testing.T, wsc *client.WSClient, eventid string, dieOnTimeo
for { for {
select { select {
case r := <-wsc.ResultsCh: case r := <-wsc.ResultsCh:
result := new(ctypes.TMResult)
wire.ReadJSONPtr(result, r, &err)
fmt.Println("GOT IT", string(r))
result := new(ctypes.ResultEvent)
err = json.Unmarshal(r, result)
if err != nil { if err != nil {
errCh <- err
break LOOP
fmt.Println("POOP", err)
// cant distinguish between error and wrong type ...
continue
} }
event, ok := result.Unwrap().(*ctypes.ResultEvent)
if ok && event.Name == eventid {
goodCh <- event.Data
if result.Name == eventid {
goodCh <- result.Data
break LOOP break LOOP
} }
case err := <-wsc.ErrorsCh: case err := <-wsc.ErrorsCh:


+ 1
- 1
test/app/counter_test.sh View File

@ -33,7 +33,7 @@ function sendTx() {
ERROR=`echo $RESPONSE | jq .error` ERROR=`echo $RESPONSE | jq .error`
ERROR=$(echo "$ERROR" | tr -d '"') # remove surrounding quotes ERROR=$(echo "$ERROR" | tr -d '"') # remove surrounding quotes
RESPONSE=`echo $RESPONSE | jq .result[1]`
RESPONSE=`echo $RESPONSE | jq .result`
else else
if [ -f grpc_client ]; then if [ -f grpc_client ]; then
rm grpc_client rm grpc_client


+ 2
- 2
test/app/dummy_test.sh View File

@ -57,7 +57,7 @@ echo "... testing query with /abci_query 2"
# we should be able to look up the key # we should be able to look up the key
RESPONSE=`curl -s "127.0.0.1:46657/abci_query?path=\"\"&data=$(toHex $KEY)&prove=false"` RESPONSE=`curl -s "127.0.0.1:46657/abci_query?path=\"\"&data=$(toHex $KEY)&prove=false"`
RESPONSE=`echo $RESPONSE | jq .result[1].response.log`
RESPONSE=`echo $RESPONSE | jq .result.response.log`
set +e set +e
A=`echo $RESPONSE | grep 'exists'` A=`echo $RESPONSE | grep 'exists'`
@ -70,7 +70,7 @@ set -e
# we should not be able to look up the value # we should not be able to look up the value
RESPONSE=`curl -s "127.0.0.1:46657/abci_query?path=\"\"&data=$(toHex $VALUE)&prove=false"` RESPONSE=`curl -s "127.0.0.1:46657/abci_query?path=\"\"&data=$(toHex $VALUE)&prove=false"`
RESPONSE=`echo $RESPONSE | jq .result[1].response.log`
RESPONSE=`echo $RESPONSE | jq .result.response.log`
set +e set +e
A=`echo $RESPONSE | grep 'exists'` A=`echo $RESPONSE | grep 'exists'`
if [[ $? == 0 ]]; then if [[ $? == 0 ]]; then


+ 5
- 5
test/p2p/atomic_broadcast/test.sh View File

@ -17,7 +17,7 @@ for i in `seq 1 $N`; do
addr=$(test/p2p/ip.sh $i):46657 addr=$(test/p2p/ip.sh $i):46657
# current state # current state
HASH1=`curl -s $addr/status | jq .result[1].latest_app_hash`
HASH1=`curl -s $addr/status | jq .result.latest_app_hash`
# - send a tx # - send a tx
TX=aadeadbeefbeefbeef0$i TX=aadeadbeefbeefbeef0$i
@ -26,15 +26,15 @@ for i in `seq 1 $N`; do
echo "" echo ""
# we need to wait another block to get the new app_hash # we need to wait another block to get the new app_hash
h1=`curl -s $addr/status | jq .result[1].latest_block_height`
h1=`curl -s $addr/status | jq .result.latest_block_height`
h2=$h1 h2=$h1
while [ "$h2" == "$h1" ]; do while [ "$h2" == "$h1" ]; do
sleep 1 sleep 1
h2=`curl -s $addr/status | jq .result[1].latest_block_height`
h2=`curl -s $addr/status | jq .result.latest_block_height`
done done
# check that hash was updated # check that hash was updated
HASH2=`curl -s $addr/status | jq .result[1].latest_app_hash`
HASH2=`curl -s $addr/status | jq .result.latest_app_hash`
if [[ "$HASH1" == "$HASH2" ]]; then if [[ "$HASH1" == "$HASH2" ]]; then
echo "Expected state hash to update from $HASH1. Got $HASH2" echo "Expected state hash to update from $HASH1. Got $HASH2"
exit 1 exit 1
@ -44,7 +44,7 @@ for i in `seq 1 $N`; do
for j in `seq 1 $N`; do for j in `seq 1 $N`; do
if [[ "$i" != "$j" ]]; then if [[ "$i" != "$j" ]]; then
addrJ=$(test/p2p/ip.sh $j):46657 addrJ=$(test/p2p/ip.sh $j):46657
HASH3=`curl -s $addrJ/status | jq .result[1].latest_app_hash`
HASH3=`curl -s $addrJ/status | jq .result.latest_app_hash`
if [[ "$HASH2" != "$HASH3" ]]; then if [[ "$HASH2" != "$HASH3" ]]; then
echo "App hash for node $j doesn't match. Got $HASH3, expected $HASH2" echo "App hash for node $j doesn't match. Got $HASH3, expected $HASH2"


+ 4
- 4
test/p2p/basic/test.sh View File

@ -31,19 +31,19 @@ for i in `seq 1 $N`; do
N_1=$(($N - 1)) N_1=$(($N - 1))
# - assert everyone has N-1 other peers # - assert everyone has N-1 other peers
N_PEERS=`curl -s $addr/net_info | jq '.result[1].peers | length'`
N_PEERS=`curl -s $addr/net_info | jq '.result.peers | length'`
while [ "$N_PEERS" != $N_1 ]; do while [ "$N_PEERS" != $N_1 ]; do
echo "Waiting for node $i to connect to all peers ..." echo "Waiting for node $i to connect to all peers ..."
sleep 1 sleep 1
N_PEERS=`curl -s $addr/net_info | jq '.result[1].peers | length'`
N_PEERS=`curl -s $addr/net_info | jq '.result.peers | length'`
done done
# - assert block height is greater than 1 # - assert block height is greater than 1
BLOCK_HEIGHT=`curl -s $addr/status | jq .result[1].latest_block_height`
BLOCK_HEIGHT=`curl -s $addr/status | jq .result.latest_block_height`
while [ "$BLOCK_HEIGHT" -le 1 ]; do while [ "$BLOCK_HEIGHT" -le 1 ]; do
echo "Waiting for node $i to commit a block ..." echo "Waiting for node $i to commit a block ..."
sleep 1 sleep 1
BLOCK_HEIGHT=`curl -s $addr/status | jq .result[1].latest_block_height`
BLOCK_HEIGHT=`curl -s $addr/status | jq .result.latest_block_height`
done done
echo "Node $i is connected to all peers and at block $BLOCK_HEIGHT" echo "Node $i is connected to all peers and at block $BLOCK_HEIGHT"
done done


+ 4
- 4
test/p2p/fast_sync/check_peer.sh View File

@ -15,10 +15,10 @@ peerID=$(( $(($ID % 4)) + 1 )) # 1->2 ... 3->4 ... 4->1
peer_addr=$(test/p2p/ip.sh $peerID):46657 peer_addr=$(test/p2p/ip.sh $peerID):46657
# get another peer's height # get another peer's height
h1=`curl -s $peer_addr/status | jq .result[1].latest_block_height`
h1=`curl -s $peer_addr/status | jq .result.latest_block_height`
# get another peer's state # get another peer's state
root1=`curl -s $peer_addr/status | jq .result[1].latest_app_hash`
root1=`curl -s $peer_addr/status | jq .result.latest_app_hash`
echo "Other peer is on height $h1 with state $root1" echo "Other peer is on height $h1 with state $root1"
echo "Waiting for peer $ID to catch up" echo "Waiting for peer $ID to catch up"
@ -29,12 +29,12 @@ set +o pipefail
h2="0" h2="0"
while [[ "$h2" -lt "$(($h1+3))" ]]; do while [[ "$h2" -lt "$(($h1+3))" ]]; do
sleep 1 sleep 1
h2=`curl -s $addr/status | jq .result[1].latest_block_height`
h2=`curl -s $addr/status | jq .result.latest_block_height`
echo "... $h2" echo "... $h2"
done done
# check the app hash # check the app hash
root2=`curl -s $addr/status | jq .result[1].latest_app_hash`
root2=`curl -s $addr/status | jq .result.latest_app_hash`
if [[ "$root1" != "$root2" ]]; then if [[ "$root1" != "$root2" ]]; then
echo "App hash after fast sync does not match. Got $root2; expected $root1" echo "App hash after fast sync does not match. Got $root2; expected $root1"


+ 3
- 3
test/p2p/kill_all/check_peers.sh View File

@ -23,7 +23,7 @@ set -e
# get the first peer's height # get the first peer's height
addr=$(test/p2p/ip.sh 1):46657 addr=$(test/p2p/ip.sh 1):46657
h1=$(curl -s "$addr/status" | jq .result[1].latest_block_height)
h1=$(curl -s "$addr/status" | jq .result.latest_block_height)
echo "1st peer is on height $h1" echo "1st peer is on height $h1"
echo "Waiting until other peers reporting a height higher than the 1st one" echo "Waiting until other peers reporting a height higher than the 1st one"
@ -33,14 +33,14 @@ for i in $(seq 2 "$NUM_OF_PEERS"); do
while [[ $hi -le $h1 ]] ; do while [[ $hi -le $h1 ]] ; do
addr=$(test/p2p/ip.sh "$i"):46657 addr=$(test/p2p/ip.sh "$i"):46657
hi=$(curl -s "$addr/status" | jq .result[1].latest_block_height)
hi=$(curl -s "$addr/status" | jq .result.latest_block_height)
echo "... peer $i is on height $hi" echo "... peer $i is on height $hi"
((attempt++)) ((attempt++))
if [ "$attempt" -ge $MAX_ATTEMPTS_TO_CATCH_UP ] ; then if [ "$attempt" -ge $MAX_ATTEMPTS_TO_CATCH_UP ] ; then
echo "$attempt unsuccessful attempts were made to catch up" echo "$attempt unsuccessful attempts were made to catch up"
curl -s "$addr/dump_consensus_state" | jq .result[1]
curl -s "$addr/dump_consensus_state" | jq .result
exit 1 exit 1
fi fi


+ 1
- 1
test/p2p/pex/check_peer.sh View File

@ -10,7 +10,7 @@ echo "2. wait until peer $ID connects to other nodes using pex reactor"
peers_count="0" peers_count="0"
while [[ "$peers_count" -lt "$((N-1))" ]]; do while [[ "$peers_count" -lt "$((N-1))" ]]; do
sleep 1 sleep 1
peers_count=$(curl -s "$addr/net_info" | jq ".result[1].peers | length")
peers_count=$(curl -s "$addr/net_info" | jq ".result.peers | length")
echo "... peers count = $peers_count, expected = $((N-1))" echo "... peers count = $peers_count, expected = $((N-1))"
done done


+ 2
- 2
test/persist/test_failure_indices.sh View File

@ -107,11 +107,11 @@ for failIndex in $(seq $failsStart $failsEnd); do
done done
# wait for a new block # wait for a new block
h1=$(curl -s --unix-socket "$RPC_ADDR" http://localhost/status | jq .result[1].latest_block_height)
h1=$(curl -s --unix-socket "$RPC_ADDR" http://localhost/status | jq .result.latest_block_height)
h2=$h1 h2=$h1
while [ "$h2" == "$h1" ]; do while [ "$h2" == "$h1" ]; do
sleep 1 sleep 1
h2=$(curl -s --unix-socket "$RPC_ADDR" http://localhost/status | jq .result[1].latest_block_height)
h2=$(curl -s --unix-socket "$RPC_ADDR" http://localhost/status | jq .result.latest_block_height)
done done
kill_procs kill_procs


+ 2
- 2
test/persist/test_simple.sh View File

@ -57,11 +57,11 @@ while [ "$ERR" != 0 ]; do
done done
# wait for a new block # wait for a new block
h1=`curl -s $addr/status | jq .result[1].latest_block_height`
h1=`curl -s $addr/status | jq .result.latest_block_height`
h2=$h1 h2=$h1
while [ "$h2" == "$h1" ]; do while [ "$h2" == "$h1" ]; do
sleep 1 sleep 1
h2=`curl -s $addr/status | jq .result[1].latest_block_height`
h2=`curl -s $addr/status | jq .result.latest_block_height`
done done
kill_procs kill_procs


+ 58
- 25
types/events.go View File

@ -3,7 +3,6 @@ package types
import ( import (
// for registering TMEventData as events.EventData // for registering TMEventData as events.EventData
abci "github.com/tendermint/abci/types" abci "github.com/tendermint/abci/types"
"github.com/tendermint/go-wire"
"github.com/tendermint/go-wire/data" "github.com/tendermint/go-wire/data"
cmn "github.com/tendermint/tmlibs/common" cmn "github.com/tendermint/tmlibs/common"
"github.com/tendermint/tmlibs/events" "github.com/tendermint/tmlibs/events"
@ -34,10 +33,47 @@ func EventStringVote() string { return "Vote" }
//---------------------------------------- //----------------------------------------
var (
EventDataNameNewBlock = "new_block"
EventDataNameNewBlockHeader = "new_block_header"
EventDataNameTx = "tx"
EventDataNameRoundState = "round_state"
EventDataNameVote = "vote"
)
//----------------------------------------
// implements events.EventData // implements events.EventData
type TMEventData interface {
type TMEventDataInner interface {
events.EventData events.EventData
AssertIsTMEventData()
}
type TMEventData struct {
TMEventDataInner `json:"unwrap"`
}
func (tmr TMEventData) MarshalJSON() ([]byte, error) {
return tmEventDataMapper.ToJSON(tmr.TMEventDataInner)
}
func (tmr *TMEventData) UnmarshalJSON(data []byte) (err error) {
parsed, err := tmEventDataMapper.FromJSON(data)
if err == nil && parsed != nil {
tmr.TMEventDataInner = parsed.(TMEventDataInner)
}
return
}
func (tmr TMEventData) Unwrap() TMEventDataInner {
tmrI := tmr.TMEventDataInner
for wrap, ok := tmrI.(TMEventData); ok; wrap, ok = tmrI.(TMEventData) {
tmrI = wrap.TMEventDataInner
}
return tmrI
}
func (tmr TMEventData) Empty() bool {
return tmr.TMEventDataInner == nil
} }
const ( const (
@ -50,15 +86,12 @@ const (
EventDataTypeVote = byte(0x12) EventDataTypeVote = byte(0x12)
) )
var _ = wire.RegisterInterface(
struct{ TMEventData }{},
wire.ConcreteType{EventDataNewBlock{}, EventDataTypeNewBlock},
wire.ConcreteType{EventDataNewBlockHeader{}, EventDataTypeNewBlockHeader},
// wire.ConcreteType{EventDataFork{}, EventDataTypeFork },
wire.ConcreteType{EventDataTx{}, EventDataTypeTx},
wire.ConcreteType{EventDataRoundState{}, EventDataTypeRoundState},
wire.ConcreteType{EventDataVote{}, EventDataTypeVote},
)
var tmEventDataMapper = data.NewMapper(TMEventData{}).
RegisterImplementation(EventDataNewBlock{}, EventDataNameNewBlock, EventDataTypeNewBlock).
RegisterImplementation(EventDataNewBlockHeader{}, EventDataNameNewBlockHeader, EventDataTypeNewBlockHeader).
RegisterImplementation(EventDataTx{}, EventDataNameTx, EventDataTypeTx).
RegisterImplementation(EventDataRoundState{}, EventDataNameRoundState, EventDataTypeRoundState).
RegisterImplementation(EventDataVote{}, EventDataNameVote, EventDataTypeVote)
// Most event messages are basic types (a block, a transaction) // Most event messages are basic types (a block, a transaction)
// but some (an input to a call tx or a receive) are more exotic // but some (an input to a call tx or a receive) are more exotic
@ -147,55 +180,55 @@ func AddListenerForEvent(evsw EventSwitch, id, event string, cb func(data TMEven
//--- block, tx, and vote events //--- block, tx, and vote events
func FireEventNewBlock(fireable events.Fireable, block EventDataNewBlock) { func FireEventNewBlock(fireable events.Fireable, block EventDataNewBlock) {
fireEvent(fireable, EventStringNewBlock(), block)
fireEvent(fireable, EventStringNewBlock(), TMEventData{block})
} }
func FireEventNewBlockHeader(fireable events.Fireable, header EventDataNewBlockHeader) { func FireEventNewBlockHeader(fireable events.Fireable, header EventDataNewBlockHeader) {
fireEvent(fireable, EventStringNewBlockHeader(), header)
fireEvent(fireable, EventStringNewBlockHeader(), TMEventData{header})
} }
func FireEventVote(fireable events.Fireable, vote EventDataVote) { func FireEventVote(fireable events.Fireable, vote EventDataVote) {
fireEvent(fireable, EventStringVote(), vote)
fireEvent(fireable, EventStringVote(), TMEventData{vote})
} }
func FireEventTx(fireable events.Fireable, tx EventDataTx) { func FireEventTx(fireable events.Fireable, tx EventDataTx) {
fireEvent(fireable, EventStringTx(tx.Tx), tx)
fireEvent(fireable, EventStringTx(tx.Tx), TMEventData{tx})
} }
//--- EventDataRoundState events //--- EventDataRoundState events
func FireEventNewRoundStep(fireable events.Fireable, rs EventDataRoundState) { func FireEventNewRoundStep(fireable events.Fireable, rs EventDataRoundState) {
fireEvent(fireable, EventStringNewRoundStep(), rs)
fireEvent(fireable, EventStringNewRoundStep(), TMEventData{rs})
} }
func FireEventTimeoutPropose(fireable events.Fireable, rs EventDataRoundState) { func FireEventTimeoutPropose(fireable events.Fireable, rs EventDataRoundState) {
fireEvent(fireable, EventStringTimeoutPropose(), rs)
fireEvent(fireable, EventStringTimeoutPropose(), TMEventData{rs})
} }
func FireEventTimeoutWait(fireable events.Fireable, rs EventDataRoundState) { func FireEventTimeoutWait(fireable events.Fireable, rs EventDataRoundState) {
fireEvent(fireable, EventStringTimeoutWait(), rs)
fireEvent(fireable, EventStringTimeoutWait(), TMEventData{rs})
} }
func FireEventNewRound(fireable events.Fireable, rs EventDataRoundState) { func FireEventNewRound(fireable events.Fireable, rs EventDataRoundState) {
fireEvent(fireable, EventStringNewRound(), rs)
fireEvent(fireable, EventStringNewRound(), TMEventData{rs})
} }
func FireEventCompleteProposal(fireable events.Fireable, rs EventDataRoundState) { func FireEventCompleteProposal(fireable events.Fireable, rs EventDataRoundState) {
fireEvent(fireable, EventStringCompleteProposal(), rs)
fireEvent(fireable, EventStringCompleteProposal(), TMEventData{rs})
} }
func FireEventPolka(fireable events.Fireable, rs EventDataRoundState) { func FireEventPolka(fireable events.Fireable, rs EventDataRoundState) {
fireEvent(fireable, EventStringPolka(), rs)
fireEvent(fireable, EventStringPolka(), TMEventData{rs})
} }
func FireEventUnlock(fireable events.Fireable, rs EventDataRoundState) { func FireEventUnlock(fireable events.Fireable, rs EventDataRoundState) {
fireEvent(fireable, EventStringUnlock(), rs)
fireEvent(fireable, EventStringUnlock(), TMEventData{rs})
} }
func FireEventRelock(fireable events.Fireable, rs EventDataRoundState) { func FireEventRelock(fireable events.Fireable, rs EventDataRoundState) {
fireEvent(fireable, EventStringRelock(), rs)
fireEvent(fireable, EventStringRelock(), TMEventData{rs})
} }
func FireEventLock(fireable events.Fireable, rs EventDataRoundState) { func FireEventLock(fireable events.Fireable, rs EventDataRoundState) {
fireEvent(fireable, EventStringLock(), rs)
fireEvent(fireable, EventStringLock(), TMEventData{rs})
} }

Loading…
Cancel
Save