Browse Source

expose `/tx?hash="XXXXXXXXXXXX"` RPC call

pull/412/head
Anton Kaliaev 8 years ago
parent
commit
63704454a3
No known key found for this signature in database GPG Key ID: 7B6881D965918214
9 changed files with 88 additions and 145 deletions
  1. +22
    -6
      node/node.go
  2. +4
    -4
      rpc/core/mempool.go
  3. +11
    -5
      rpc/core/pipe.go
  4. +37
    -113
      rpc/core/routes.go
  5. +2
    -0
      rpc/core/types/responses.go
  6. +3
    -1
      state/execution.go
  7. +1
    -11
      state/state.go
  8. +5
    -2
      state/tx/indexer/null.go
  9. +3
    -3
      types/tx_result.go

+ 22
- 6
node/node.go View File

@ -10,12 +10,12 @@ import (
abci "github.com/tendermint/abci/types" abci "github.com/tendermint/abci/types"
cmn "github.com/tendermint/go-common" cmn "github.com/tendermint/go-common"
cfg "github.com/tendermint/go-config" cfg "github.com/tendermint/go-config"
"github.com/tendermint/go-crypto"
crypto "github.com/tendermint/go-crypto"
dbm "github.com/tendermint/go-db" dbm "github.com/tendermint/go-db"
"github.com/tendermint/go-p2p"
"github.com/tendermint/go-rpc"
"github.com/tendermint/go-rpc/server"
"github.com/tendermint/go-wire"
p2p "github.com/tendermint/go-p2p"
rpc "github.com/tendermint/go-rpc"
rpcserver "github.com/tendermint/go-rpc/server"
wire "github.com/tendermint/go-wire"
bc "github.com/tendermint/tendermint/blockchain" bc "github.com/tendermint/tendermint/blockchain"
"github.com/tendermint/tendermint/consensus" "github.com/tendermint/tendermint/consensus"
mempl "github.com/tendermint/tendermint/mempool" mempl "github.com/tendermint/tendermint/mempool"
@ -23,6 +23,8 @@ import (
rpccore "github.com/tendermint/tendermint/rpc/core" rpccore "github.com/tendermint/tendermint/rpc/core"
grpccore "github.com/tendermint/tendermint/rpc/grpc" grpccore "github.com/tendermint/tendermint/rpc/grpc"
sm "github.com/tendermint/tendermint/state" sm "github.com/tendermint/tendermint/state"
"github.com/tendermint/tendermint/state/tx"
txindexer "github.com/tendermint/tendermint/state/tx/indexer"
"github.com/tendermint/tendermint/types" "github.com/tendermint/tendermint/types"
"github.com/tendermint/tendermint/version" "github.com/tendermint/tendermint/version"
@ -51,6 +53,7 @@ type Node struct {
consensusReactor *consensus.ConsensusReactor // for participating in the consensus consensusReactor *consensus.ConsensusReactor // for participating in the consensus
proxyApp proxy.AppConns // connection to the application proxyApp proxy.AppConns // connection to the application
rpcListeners []net.Listener // rpc servers rpcListeners []net.Listener // rpc servers
txIndexer tx.Indexer
} }
func NewNodeDefault(config cfg.Config) *Node { func NewNodeDefault(config cfg.Config) *Node {
@ -82,7 +85,18 @@ func NewNode(config cfg.Config, privValidator *types.PrivValidator, clientCreato
} }
// reload the state (it may have been updated by the handshake) // reload the state (it may have been updated by the handshake)
state = sm.GetState(config, stateDB)
state = sm.LoadState(stateDB)
// Transaction indexing
var txIndexer tx.Indexer
switch config.GetString("tx_indexer") {
case "kv":
store := dbm.NewDB("tx_indexer", config.GetString("db_backend"), config.GetString("db_dir"))
txIndexer = txindexer.NewKV(store)
default:
txIndexer = &txindexer.Null{}
}
state.TxIndexer = txIndexer
// Generate node PrivKey // Generate node PrivKey
privKey := crypto.GenPrivKeyEd25519() privKey := crypto.GenPrivKeyEd25519()
@ -188,6 +202,7 @@ func NewNode(config cfg.Config, privValidator *types.PrivValidator, clientCreato
consensusState: consensusState, consensusState: consensusState,
consensusReactor: consensusReactor, consensusReactor: consensusReactor,
proxyApp: proxyApp, proxyApp: proxyApp,
txIndexer: txIndexer,
} }
node.BaseService = *cmn.NewBaseService(log, "Node", node) node.BaseService = *cmn.NewBaseService(log, "Node", node)
return node return node
@ -278,6 +293,7 @@ func (n *Node) ConfigureRPC() {
rpccore.SetGenesisDoc(n.genesisDoc) rpccore.SetGenesisDoc(n.genesisDoc)
rpccore.SetAddrBook(n.addrBook) rpccore.SetAddrBook(n.addrBook)
rpccore.SetProxyAppQuery(n.proxyApp.Query()) rpccore.SetProxyAppQuery(n.proxyApp.Query())
rpccore.SetTxIndexer(n.txIndexer)
} }
func (n *Node) startRPC() ([]net.Listener, error) { func (n *Node) startRPC() ([]net.Listener, error) {


+ 4
- 4
rpc/core/mempool.go View File

@ -4,9 +4,9 @@ import (
"fmt" "fmt"
"time" "time"
abci "github.com/tendermint/abci/types"
ctypes "github.com/tendermint/tendermint/rpc/core/types" ctypes "github.com/tendermint/tendermint/rpc/core/types"
"github.com/tendermint/tendermint/types" "github.com/tendermint/tendermint/types"
abci "github.com/tendermint/abci/types"
) )
//----------------------------------------------------------------------------- //-----------------------------------------------------------------------------
@ -65,7 +65,7 @@ func BroadcastTxCommit(tx types.Tx) (*ctypes.ResultBroadcastTxCommit, error) {
if checkTxR.Code != abci.CodeType_OK { if checkTxR.Code != abci.CodeType_OK {
// CheckTx failed! // CheckTx failed!
return &ctypes.ResultBroadcastTxCommit{ return &ctypes.ResultBroadcastTxCommit{
CheckTx: checkTxR,
CheckTx: checkTxR,
DeliverTx: nil, DeliverTx: nil,
}, nil }, nil
} }
@ -84,13 +84,13 @@ func BroadcastTxCommit(tx types.Tx) (*ctypes.ResultBroadcastTxCommit, error) {
} }
log.Notice("DeliverTx passed ", "tx", []byte(tx), "response", deliverTxR) log.Notice("DeliverTx passed ", "tx", []byte(tx), "response", deliverTxR)
return &ctypes.ResultBroadcastTxCommit{ return &ctypes.ResultBroadcastTxCommit{
CheckTx: checkTxR,
CheckTx: checkTxR,
DeliverTx: deliverTxR, DeliverTx: deliverTxR,
}, nil }, nil
case <-timer.C: case <-timer.C:
log.Error("failed to include tx") log.Error("failed to include tx")
return &ctypes.ResultBroadcastTxCommit{ return &ctypes.ResultBroadcastTxCommit{
CheckTx: checkTxR,
CheckTx: checkTxR,
DeliverTx: nil, DeliverTx: nil,
}, fmt.Errorf("Timed out waiting for transaction to be included in a block") }, fmt.Errorf("Timed out waiting for transaction to be included in a block")
} }


+ 11
- 5
rpc/core/pipe.go View File

@ -2,11 +2,12 @@ package core
import ( import (
cfg "github.com/tendermint/go-config" cfg "github.com/tendermint/go-config"
"github.com/tendermint/go-crypto"
"github.com/tendermint/go-p2p"
crypto "github.com/tendermint/go-crypto"
p2p "github.com/tendermint/go-p2p"
"github.com/tendermint/tendermint/consensus" "github.com/tendermint/tendermint/consensus"
"github.com/tendermint/tendermint/proxy" "github.com/tendermint/tendermint/proxy"
"github.com/tendermint/tendermint/state/tx"
"github.com/tendermint/tendermint/types" "github.com/tendermint/tendermint/types"
) )
@ -42,9 +43,10 @@ var (
p2pSwitch P2P p2pSwitch P2P
// objects // objects
pubKey crypto.PubKey
genDoc *types.GenesisDoc // cache the genesis structure
addrBook *p2p.AddrBook
pubKey crypto.PubKey
genDoc *types.GenesisDoc // cache the genesis structure
addrBook *p2p.AddrBook
txIndexer tx.Indexer
) )
func SetConfig(c cfg.Config) { func SetConfig(c cfg.Config) {
@ -86,3 +88,7 @@ func SetAddrBook(book *p2p.AddrBook) {
func SetProxyAppQuery(appConn proxy.AppConnQuery) { func SetProxyAppQuery(appConn proxy.AppConnQuery) {
proxyAppQuery = appConn proxyAppQuery = appConn
} }
func SetTxIndexer(indexer tx.Indexer) {
txIndexer = indexer
}

+ 37
- 113
rpc/core/routes.go View File

@ -1,9 +1,12 @@
package core package core
import ( import (
"strings"
rpc "github.com/tendermint/go-rpc/server" rpc "github.com/tendermint/go-rpc/server"
"github.com/tendermint/go-rpc/types" "github.com/tendermint/go-rpc/types"
ctypes "github.com/tendermint/tendermint/rpc/core/types" ctypes "github.com/tendermint/tendermint/rpc/core/types"
"github.com/tendermint/tendermint/types"
) )
// TODO: better system than "unsafe" prefix // TODO: better system than "unsafe" prefix
@ -23,6 +26,7 @@ var Routes = map[string]*rpc.RPCFunc{
"dump_consensus_state": rpc.NewRPCFunc(DumpConsensusStateResult, ""), "dump_consensus_state": rpc.NewRPCFunc(DumpConsensusStateResult, ""),
"unconfirmed_txs": rpc.NewRPCFunc(UnconfirmedTxsResult, ""), "unconfirmed_txs": rpc.NewRPCFunc(UnconfirmedTxsResult, ""),
"num_unconfirmed_txs": rpc.NewRPCFunc(NumUnconfirmedTxsResult, ""), "num_unconfirmed_txs": rpc.NewRPCFunc(NumUnconfirmedTxsResult, ""),
"tx": rpc.NewRPCFunc(Tx, "hash"),
// broadcast API // broadcast API
"broadcast_tx_commit": rpc.NewRPCFunc(BroadcastTxCommitResult, "tx"), "broadcast_tx_commit": rpc.NewRPCFunc(BroadcastTxCommitResult, "tx"),
@ -45,185 +49,105 @@ var Routes = map[string]*rpc.RPCFunc{
} }
func SubscribeResult(wsCtx rpctypes.WSRPCContext, event string) (ctypes.TMResult, error) { func SubscribeResult(wsCtx rpctypes.WSRPCContext, event string) (ctypes.TMResult, error) {
if r, err := Subscribe(wsCtx, event); err != nil {
return nil, err
} else {
return r, nil
}
return Subscribe(wsCtx, event)
} }
func UnsubscribeResult(wsCtx rpctypes.WSRPCContext, event string) (ctypes.TMResult, error) { func UnsubscribeResult(wsCtx rpctypes.WSRPCContext, event string) (ctypes.TMResult, error) {
if r, err := Unsubscribe(wsCtx, event); err != nil {
return nil, err
} else {
return r, nil
}
return Unsubscribe(wsCtx, event)
} }
func StatusResult() (ctypes.TMResult, error) { func StatusResult() (ctypes.TMResult, error) {
if r, err := Status(); err != nil {
return nil, err
} else {
return r, nil
}
return Status()
} }
func NetInfoResult() (ctypes.TMResult, error) { func NetInfoResult() (ctypes.TMResult, error) {
if r, err := NetInfo(); err != nil {
return nil, err
} else {
return r, nil
}
return NetInfo()
} }
func UnsafeDialSeedsResult(seeds []string) (ctypes.TMResult, error) { func UnsafeDialSeedsResult(seeds []string) (ctypes.TMResult, error) {
if r, err := UnsafeDialSeeds(seeds); err != nil {
return nil, err
} else {
return r, nil
}
return UnsafeDialSeeds(seeds)
} }
func BlockchainInfoResult(min, max int) (ctypes.TMResult, error) { func BlockchainInfoResult(min, max int) (ctypes.TMResult, error) {
if r, err := BlockchainInfo(min, max); err != nil {
return nil, err
} else {
return r, nil
}
return BlockchainInfo(min, max)
} }
func GenesisResult() (ctypes.TMResult, error) { func GenesisResult() (ctypes.TMResult, error) {
if r, err := Genesis(); err != nil {
return nil, err
} else {
return r, nil
}
return Genesis()
} }
func BlockResult(height int) (ctypes.TMResult, error) { func BlockResult(height int) (ctypes.TMResult, error) {
if r, err := Block(height); err != nil {
return nil, err
} else {
return r, nil
}
return Block(height)
} }
func CommitResult(height int) (ctypes.TMResult, error) { func CommitResult(height int) (ctypes.TMResult, error) {
if r, err := Commit(height); err != nil {
return nil, err
} else {
return r, nil
}
return Commit(height)
} }
func ValidatorsResult() (ctypes.TMResult, error) { func ValidatorsResult() (ctypes.TMResult, error) {
if r, err := Validators(); err != nil {
return nil, err
} else {
return r, nil
}
return Validators()
} }
func DumpConsensusStateResult() (ctypes.TMResult, error) { func DumpConsensusStateResult() (ctypes.TMResult, error) {
if r, err := DumpConsensusState(); err != nil {
return nil, err
} else {
return r, nil
}
return DumpConsensusState()
} }
func UnconfirmedTxsResult() (ctypes.TMResult, error) { func UnconfirmedTxsResult() (ctypes.TMResult, error) {
if r, err := UnconfirmedTxs(); err != nil {
return nil, err
} else {
return r, nil
}
return UnconfirmedTxs()
} }
func NumUnconfirmedTxsResult() (ctypes.TMResult, error) { func NumUnconfirmedTxsResult() (ctypes.TMResult, error) {
if r, err := NumUnconfirmedTxs(); err != nil {
return NumUnconfirmedTxs()
}
// Tx allow user to query the transaction results. `nil` could mean the
// transaction is in the mempool, invalidated, or was not send in the first
// place.
func Tx(hash string) (ctypes.TMResult, error) {
r, err := txIndexer.Tx(strings.ToLower(hash))
// nil-pointer interface values are forbidden in go-rpc
if r == (*types.TxResult)(nil) {
return nil, err return nil, err
} else {
return r, nil
} }
return r, err
} }
func BroadcastTxCommitResult(tx []byte) (ctypes.TMResult, error) { func BroadcastTxCommitResult(tx []byte) (ctypes.TMResult, error) {
if r, err := BroadcastTxCommit(tx); err != nil {
return nil, err
} else {
return r, nil
}
return BroadcastTxCommit(tx)
} }
func BroadcastTxSyncResult(tx []byte) (ctypes.TMResult, error) { func BroadcastTxSyncResult(tx []byte) (ctypes.TMResult, error) {
if r, err := BroadcastTxSync(tx); err != nil {
return nil, err
} else {
return r, nil
}
return BroadcastTxSync(tx)
} }
func BroadcastTxAsyncResult(tx []byte) (ctypes.TMResult, error) { func BroadcastTxAsyncResult(tx []byte) (ctypes.TMResult, error) {
if r, err := BroadcastTxAsync(tx); err != nil {
return nil, err
} else {
return r, nil
}
return BroadcastTxAsync(tx)
} }
func ABCIQueryResult(path string, data []byte, prove bool) (ctypes.TMResult, error) { func ABCIQueryResult(path string, data []byte, prove bool) (ctypes.TMResult, error) {
if r, err := ABCIQuery(path, data, prove); err != nil {
return nil, err
} else {
return r, nil
}
return ABCIQuery(path, data, prove)
} }
func ABCIInfoResult() (ctypes.TMResult, error) { func ABCIInfoResult() (ctypes.TMResult, error) {
if r, err := ABCIInfo(); err != nil {
return nil, err
} else {
return r, nil
}
return ABCIInfo()
} }
func UnsafeFlushMempoolResult() (ctypes.TMResult, error) { func UnsafeFlushMempoolResult() (ctypes.TMResult, error) {
if r, err := UnsafeFlushMempool(); err != nil {
return nil, err
} else {
return r, nil
}
return UnsafeFlushMempool()
} }
func UnsafeSetConfigResult(typ, key, value string) (ctypes.TMResult, error) { func UnsafeSetConfigResult(typ, key, value string) (ctypes.TMResult, error) {
if r, err := UnsafeSetConfig(typ, key, value); err != nil {
return nil, err
} else {
return r, nil
}
return UnsafeSetConfig(typ, key, value)
} }
func UnsafeStartCPUProfilerResult(filename string) (ctypes.TMResult, error) { func UnsafeStartCPUProfilerResult(filename string) (ctypes.TMResult, error) {
if r, err := UnsafeStartCPUProfiler(filename); err != nil {
return nil, err
} else {
return r, nil
}
return UnsafeStartCPUProfiler(filename)
} }
func UnsafeStopCPUProfilerResult() (ctypes.TMResult, error) { func UnsafeStopCPUProfilerResult() (ctypes.TMResult, error) {
if r, err := UnsafeStopCPUProfiler(); err != nil {
return nil, err
} else {
return r, nil
}
return UnsafeStopCPUProfiler()
} }
func UnsafeWriteHeapProfileResult(filename string) (ctypes.TMResult, error) { func UnsafeWriteHeapProfileResult(filename string) (ctypes.TMResult, error) {
if r, err := UnsafeWriteHeapProfile(filename); err != nil {
return nil, err
} else {
return r, nil
}
return UnsafeWriteHeapProfile(filename)
} }

+ 2
- 0
rpc/core/types/responses.go View File

@ -127,6 +127,7 @@ const (
ResultTypeBroadcastTx = byte(0x60) ResultTypeBroadcastTx = byte(0x60)
ResultTypeUnconfirmedTxs = byte(0x61) ResultTypeUnconfirmedTxs = byte(0x61)
ResultTypeBroadcastTxCommit = byte(0x62) ResultTypeBroadcastTxCommit = byte(0x62)
ResultTypeTx = byte(0x63)
// 0x7 bytes are for querying the application // 0x7 bytes are for querying the application
ResultTypeABCIQuery = byte(0x70) ResultTypeABCIQuery = byte(0x70)
@ -163,6 +164,7 @@ var _ = wire.RegisterInterface(
wire.ConcreteType{&ResultDumpConsensusState{}, ResultTypeDumpConsensusState}, wire.ConcreteType{&ResultDumpConsensusState{}, ResultTypeDumpConsensusState},
wire.ConcreteType{&ResultBroadcastTx{}, ResultTypeBroadcastTx}, wire.ConcreteType{&ResultBroadcastTx{}, ResultTypeBroadcastTx},
wire.ConcreteType{&ResultBroadcastTxCommit{}, ResultTypeBroadcastTxCommit}, wire.ConcreteType{&ResultBroadcastTxCommit{}, ResultTypeBroadcastTxCommit},
wire.ConcreteType{&types.TxResult{}, ResultTypeTx},
wire.ConcreteType{&ResultUnconfirmedTxs{}, ResultTypeUnconfirmedTxs}, wire.ConcreteType{&ResultUnconfirmedTxs{}, ResultTypeUnconfirmedTxs},
wire.ConcreteType{&ResultSubscribe{}, ResultTypeSubscribe}, wire.ConcreteType{&ResultSubscribe{}, ResultTypeSubscribe},
wire.ConcreteType{&ResultUnsubscribe{}, ResultTypeUnsubscribe}, wire.ConcreteType{&ResultUnsubscribe{}, ResultTypeUnsubscribe},


+ 3
- 1
state/execution.go View File

@ -1,6 +1,7 @@
package state package state
import ( import (
"encoding/hex"
"errors" "errors"
"fmt" "fmt"
@ -251,7 +252,8 @@ func (s *State) ApplyBlock(eventCache types.Fireable, proxyAppConn proxy.AppConn
for i, r := range txResults { for i, r := range txResults {
if r != nil { if r != nil {
tx := block.Txs[i] tx := block.Txs[i]
batch.Index(string(tx.Hash()), *r)
// dd2e325f79f7e5f77788759d278c1d4b370c842e => {"height":2405, "index":0, ...}
batch.Index(hex.EncodeToString(tx.Hash()), *r)
} }
} }
s.TxIndexer.Batch(batch) s.TxIndexer.Batch(batch)


+ 1
- 11
state/state.go View File

@ -44,13 +44,12 @@ type State struct {
TxIndexer tx.Indexer `json:"-"` // Transaction indexer. TxIndexer tx.Indexer `json:"-"` // Transaction indexer.
} }
// Used in tests.
func LoadState(db dbm.DB) *State { func LoadState(db dbm.DB) *State {
return loadState(db, stateKey) return loadState(db, stateKey)
} }
func loadState(db dbm.DB, key []byte) *State { func loadState(db dbm.DB, key []byte) *State {
s := &State{db: db}
s := &State{db: db, TxIndexer: &txindexer.Null{}}
buf := db.Get(key) buf := db.Get(key)
if len(buf) == 0 { if len(buf) == 0 {
return nil return nil
@ -132,15 +131,6 @@ func GetState(config cfg.Config, stateDB dbm.DB) *State {
state.Save() state.Save()
} }
// Transaction indexing
switch config.GetString("tx_indexer") {
case "kv":
store := dbm.NewDB("tx_indexer", config.GetString("db_backend"), config.GetString("db_dir"))
state.TxIndexer = txindexer.NewKV(store)
default:
state.TxIndexer = &txindexer.Null{}
}
return state return state
} }


+ 5
- 2
state/tx/indexer/null.go View File

@ -1,13 +1,16 @@
package indexer package indexer
import "github.com/tendermint/tendermint/types"
import (
"errors"
"github.com/tendermint/tendermint/types"
)
// Null acts as a /dev/null. // Null acts as a /dev/null.
type Null struct{} type Null struct{}
// Tx panics. // Tx panics.
func (indexer *Null) Tx(hash string) (*types.TxResult, error) { func (indexer *Null) Tx(hash string) (*types.TxResult, error) {
panic("You are trying to get the transaction from a null indexer")
return nil, errors.New("Indexing is disabled (set `tx_indexer=kv` in config)")
} }
// Batch returns nil. // Batch returns nil.


+ 3
- 3
types/tx_result.go View File

@ -8,7 +8,7 @@ import (
// //
// One usage is indexing transaction results. // One usage is indexing transaction results.
type TxResult struct { type TxResult struct {
Height uint64
Index uint32
DeliverTxResponse abci.ResponseDeliverTx
Height uint64 `json:"height"`
Index uint32 `json:"index"`
DeliverTx abci.ResponseDeliverTx `json:"deliver_tx"`
} }

Loading…
Cancel
Save