diff --git a/node/node.go b/node/node.go index 7a88f2d09..0eea6278f 100644 --- a/node/node.go +++ b/node/node.go @@ -10,12 +10,12 @@ import ( abci "github.com/tendermint/abci/types" cmn "github.com/tendermint/go-common" cfg "github.com/tendermint/go-config" - "github.com/tendermint/go-crypto" + crypto "github.com/tendermint/go-crypto" dbm "github.com/tendermint/go-db" - "github.com/tendermint/go-p2p" - "github.com/tendermint/go-rpc" - "github.com/tendermint/go-rpc/server" - "github.com/tendermint/go-wire" + p2p "github.com/tendermint/go-p2p" + rpc "github.com/tendermint/go-rpc" + rpcserver "github.com/tendermint/go-rpc/server" + wire "github.com/tendermint/go-wire" bc "github.com/tendermint/tendermint/blockchain" "github.com/tendermint/tendermint/consensus" mempl "github.com/tendermint/tendermint/mempool" @@ -23,6 +23,8 @@ import ( rpccore "github.com/tendermint/tendermint/rpc/core" grpccore "github.com/tendermint/tendermint/rpc/grpc" sm "github.com/tendermint/tendermint/state" + "github.com/tendermint/tendermint/state/tx" + txindexer "github.com/tendermint/tendermint/state/tx/indexer" "github.com/tendermint/tendermint/types" "github.com/tendermint/tendermint/version" @@ -51,6 +53,7 @@ type Node struct { consensusReactor *consensus.ConsensusReactor // for participating in the consensus proxyApp proxy.AppConns // connection to the application rpcListeners []net.Listener // rpc servers + txIndexer tx.Indexer } func NewNodeDefault(config cfg.Config) *Node { @@ -82,7 +85,18 @@ func NewNode(config cfg.Config, privValidator *types.PrivValidator, clientCreato } // reload the state (it may have been updated by the handshake) - state = sm.GetState(config, stateDB) + state = sm.LoadState(stateDB) + + // Transaction indexing + var txIndexer tx.Indexer + switch config.GetString("tx_indexer") { + case "kv": + store := dbm.NewDB("tx_indexer", config.GetString("db_backend"), config.GetString("db_dir")) + txIndexer = txindexer.NewKV(store) + default: + txIndexer = &txindexer.Null{} + } + state.TxIndexer = txIndexer // Generate node PrivKey privKey := crypto.GenPrivKeyEd25519() @@ -188,6 +202,7 @@ func NewNode(config cfg.Config, privValidator *types.PrivValidator, clientCreato consensusState: consensusState, consensusReactor: consensusReactor, proxyApp: proxyApp, + txIndexer: txIndexer, } node.BaseService = *cmn.NewBaseService(log, "Node", node) return node @@ -278,6 +293,7 @@ func (n *Node) ConfigureRPC() { rpccore.SetGenesisDoc(n.genesisDoc) rpccore.SetAddrBook(n.addrBook) rpccore.SetProxyAppQuery(n.proxyApp.Query()) + rpccore.SetTxIndexer(n.txIndexer) } func (n *Node) startRPC() ([]net.Listener, error) { diff --git a/rpc/core/mempool.go b/rpc/core/mempool.go index eefc226ad..dfe841365 100644 --- a/rpc/core/mempool.go +++ b/rpc/core/mempool.go @@ -4,9 +4,9 @@ import ( "fmt" "time" + abci "github.com/tendermint/abci/types" ctypes "github.com/tendermint/tendermint/rpc/core/types" "github.com/tendermint/tendermint/types" - abci "github.com/tendermint/abci/types" ) //----------------------------------------------------------------------------- @@ -65,7 +65,7 @@ func BroadcastTxCommit(tx types.Tx) (*ctypes.ResultBroadcastTxCommit, error) { if checkTxR.Code != abci.CodeType_OK { // CheckTx failed! return &ctypes.ResultBroadcastTxCommit{ - CheckTx: checkTxR, + CheckTx: checkTxR, DeliverTx: nil, }, nil } @@ -84,13 +84,13 @@ func BroadcastTxCommit(tx types.Tx) (*ctypes.ResultBroadcastTxCommit, error) { } log.Notice("DeliverTx passed ", "tx", []byte(tx), "response", deliverTxR) return &ctypes.ResultBroadcastTxCommit{ - CheckTx: checkTxR, + CheckTx: checkTxR, DeliverTx: deliverTxR, }, nil case <-timer.C: log.Error("failed to include tx") return &ctypes.ResultBroadcastTxCommit{ - CheckTx: checkTxR, + CheckTx: checkTxR, DeliverTx: nil, }, fmt.Errorf("Timed out waiting for transaction to be included in a block") } diff --git a/rpc/core/pipe.go b/rpc/core/pipe.go index 123b13dd8..c92216428 100644 --- a/rpc/core/pipe.go +++ b/rpc/core/pipe.go @@ -2,11 +2,12 @@ package core import ( cfg "github.com/tendermint/go-config" - "github.com/tendermint/go-crypto" - "github.com/tendermint/go-p2p" + crypto "github.com/tendermint/go-crypto" + p2p "github.com/tendermint/go-p2p" "github.com/tendermint/tendermint/consensus" "github.com/tendermint/tendermint/proxy" + "github.com/tendermint/tendermint/state/tx" "github.com/tendermint/tendermint/types" ) @@ -42,9 +43,10 @@ var ( p2pSwitch P2P // objects - pubKey crypto.PubKey - genDoc *types.GenesisDoc // cache the genesis structure - addrBook *p2p.AddrBook + pubKey crypto.PubKey + genDoc *types.GenesisDoc // cache the genesis structure + addrBook *p2p.AddrBook + txIndexer tx.Indexer ) func SetConfig(c cfg.Config) { @@ -86,3 +88,7 @@ func SetAddrBook(book *p2p.AddrBook) { func SetProxyAppQuery(appConn proxy.AppConnQuery) { proxyAppQuery = appConn } + +func SetTxIndexer(indexer tx.Indexer) { + txIndexer = indexer +} diff --git a/rpc/core/routes.go b/rpc/core/routes.go index 643b2bf02..e79fb0299 100644 --- a/rpc/core/routes.go +++ b/rpc/core/routes.go @@ -1,9 +1,12 @@ package core import ( + "strings" + rpc "github.com/tendermint/go-rpc/server" "github.com/tendermint/go-rpc/types" ctypes "github.com/tendermint/tendermint/rpc/core/types" + "github.com/tendermint/tendermint/types" ) // TODO: better system than "unsafe" prefix @@ -23,6 +26,7 @@ var Routes = map[string]*rpc.RPCFunc{ "dump_consensus_state": rpc.NewRPCFunc(DumpConsensusStateResult, ""), "unconfirmed_txs": rpc.NewRPCFunc(UnconfirmedTxsResult, ""), "num_unconfirmed_txs": rpc.NewRPCFunc(NumUnconfirmedTxsResult, ""), + "tx": rpc.NewRPCFunc(Tx, "hash"), // broadcast API "broadcast_tx_commit": rpc.NewRPCFunc(BroadcastTxCommitResult, "tx"), @@ -45,185 +49,105 @@ var Routes = map[string]*rpc.RPCFunc{ } func SubscribeResult(wsCtx rpctypes.WSRPCContext, event string) (ctypes.TMResult, error) { - if r, err := Subscribe(wsCtx, event); err != nil { - return nil, err - } else { - return r, nil - } + return Subscribe(wsCtx, event) } func UnsubscribeResult(wsCtx rpctypes.WSRPCContext, event string) (ctypes.TMResult, error) { - if r, err := Unsubscribe(wsCtx, event); err != nil { - return nil, err - } else { - return r, nil - } + return Unsubscribe(wsCtx, event) } func StatusResult() (ctypes.TMResult, error) { - if r, err := Status(); err != nil { - return nil, err - } else { - return r, nil - } + return Status() } func NetInfoResult() (ctypes.TMResult, error) { - if r, err := NetInfo(); err != nil { - return nil, err - } else { - return r, nil - } + return NetInfo() } func UnsafeDialSeedsResult(seeds []string) (ctypes.TMResult, error) { - if r, err := UnsafeDialSeeds(seeds); err != nil { - return nil, err - } else { - return r, nil - } + return UnsafeDialSeeds(seeds) } func BlockchainInfoResult(min, max int) (ctypes.TMResult, error) { - if r, err := BlockchainInfo(min, max); err != nil { - return nil, err - } else { - return r, nil - } + return BlockchainInfo(min, max) } func GenesisResult() (ctypes.TMResult, error) { - if r, err := Genesis(); err != nil { - return nil, err - } else { - return r, nil - } + return Genesis() } func BlockResult(height int) (ctypes.TMResult, error) { - if r, err := Block(height); err != nil { - return nil, err - } else { - return r, nil - } + return Block(height) } func CommitResult(height int) (ctypes.TMResult, error) { - if r, err := Commit(height); err != nil { - return nil, err - } else { - return r, nil - } + return Commit(height) } func ValidatorsResult() (ctypes.TMResult, error) { - if r, err := Validators(); err != nil { - return nil, err - } else { - return r, nil - } + return Validators() } func DumpConsensusStateResult() (ctypes.TMResult, error) { - if r, err := DumpConsensusState(); err != nil { - return nil, err - } else { - return r, nil - } + return DumpConsensusState() } func UnconfirmedTxsResult() (ctypes.TMResult, error) { - if r, err := UnconfirmedTxs(); err != nil { - return nil, err - } else { - return r, nil - } + return UnconfirmedTxs() } func NumUnconfirmedTxsResult() (ctypes.TMResult, error) { - if r, err := NumUnconfirmedTxs(); err != nil { + return NumUnconfirmedTxs() +} + +// Tx allow user to query the transaction results. `nil` could mean the +// transaction is in the mempool, invalidated, or was not send in the first +// place. +func Tx(hash string) (ctypes.TMResult, error) { + r, err := txIndexer.Tx(strings.ToLower(hash)) + // nil-pointer interface values are forbidden in go-rpc + if r == (*types.TxResult)(nil) { return nil, err - } else { - return r, nil } + return r, err } func BroadcastTxCommitResult(tx []byte) (ctypes.TMResult, error) { - if r, err := BroadcastTxCommit(tx); err != nil { - return nil, err - } else { - return r, nil - } + return BroadcastTxCommit(tx) } func BroadcastTxSyncResult(tx []byte) (ctypes.TMResult, error) { - if r, err := BroadcastTxSync(tx); err != nil { - return nil, err - } else { - return r, nil - } + return BroadcastTxSync(tx) } func BroadcastTxAsyncResult(tx []byte) (ctypes.TMResult, error) { - if r, err := BroadcastTxAsync(tx); err != nil { - return nil, err - } else { - return r, nil - } + return BroadcastTxAsync(tx) } func ABCIQueryResult(path string, data []byte, prove bool) (ctypes.TMResult, error) { - if r, err := ABCIQuery(path, data, prove); err != nil { - return nil, err - } else { - return r, nil - } + return ABCIQuery(path, data, prove) } func ABCIInfoResult() (ctypes.TMResult, error) { - if r, err := ABCIInfo(); err != nil { - return nil, err - } else { - return r, nil - } + return ABCIInfo() } func UnsafeFlushMempoolResult() (ctypes.TMResult, error) { - if r, err := UnsafeFlushMempool(); err != nil { - return nil, err - } else { - return r, nil - } + return UnsafeFlushMempool() } func UnsafeSetConfigResult(typ, key, value string) (ctypes.TMResult, error) { - if r, err := UnsafeSetConfig(typ, key, value); err != nil { - return nil, err - } else { - return r, nil - } + return UnsafeSetConfig(typ, key, value) } func UnsafeStartCPUProfilerResult(filename string) (ctypes.TMResult, error) { - if r, err := UnsafeStartCPUProfiler(filename); err != nil { - return nil, err - } else { - return r, nil - } + return UnsafeStartCPUProfiler(filename) } func UnsafeStopCPUProfilerResult() (ctypes.TMResult, error) { - if r, err := UnsafeStopCPUProfiler(); err != nil { - return nil, err - } else { - return r, nil - } + return UnsafeStopCPUProfiler() } func UnsafeWriteHeapProfileResult(filename string) (ctypes.TMResult, error) { - if r, err := UnsafeWriteHeapProfile(filename); err != nil { - return nil, err - } else { - return r, nil - } + return UnsafeWriteHeapProfile(filename) } diff --git a/rpc/core/types/responses.go b/rpc/core/types/responses.go index bcab4f59c..908a33f19 100644 --- a/rpc/core/types/responses.go +++ b/rpc/core/types/responses.go @@ -127,6 +127,7 @@ const ( ResultTypeBroadcastTx = byte(0x60) ResultTypeUnconfirmedTxs = byte(0x61) ResultTypeBroadcastTxCommit = byte(0x62) + ResultTypeTx = byte(0x63) // 0x7 bytes are for querying the application ResultTypeABCIQuery = byte(0x70) @@ -163,6 +164,7 @@ var _ = wire.RegisterInterface( wire.ConcreteType{&ResultDumpConsensusState{}, ResultTypeDumpConsensusState}, wire.ConcreteType{&ResultBroadcastTx{}, ResultTypeBroadcastTx}, wire.ConcreteType{&ResultBroadcastTxCommit{}, ResultTypeBroadcastTxCommit}, + wire.ConcreteType{&types.TxResult{}, ResultTypeTx}, wire.ConcreteType{&ResultUnconfirmedTxs{}, ResultTypeUnconfirmedTxs}, wire.ConcreteType{&ResultSubscribe{}, ResultTypeSubscribe}, wire.ConcreteType{&ResultUnsubscribe{}, ResultTypeUnsubscribe}, diff --git a/state/execution.go b/state/execution.go index 0a82c6da9..bc125d4e3 100644 --- a/state/execution.go +++ b/state/execution.go @@ -1,6 +1,7 @@ package state import ( + "encoding/hex" "errors" "fmt" @@ -251,7 +252,8 @@ func (s *State) ApplyBlock(eventCache types.Fireable, proxyAppConn proxy.AppConn for i, r := range txResults { if r != nil { tx := block.Txs[i] - batch.Index(string(tx.Hash()), *r) + // dd2e325f79f7e5f77788759d278c1d4b370c842e => {"height":2405, "index":0, ...} + batch.Index(hex.EncodeToString(tx.Hash()), *r) } } s.TxIndexer.Batch(batch) diff --git a/state/state.go b/state/state.go index 347718f1e..cecb38418 100644 --- a/state/state.go +++ b/state/state.go @@ -44,13 +44,12 @@ type State struct { TxIndexer tx.Indexer `json:"-"` // Transaction indexer. } -// Used in tests. func LoadState(db dbm.DB) *State { return loadState(db, stateKey) } func loadState(db dbm.DB, key []byte) *State { - s := &State{db: db} + s := &State{db: db, TxIndexer: &txindexer.Null{}} buf := db.Get(key) if len(buf) == 0 { return nil @@ -132,15 +131,6 @@ func GetState(config cfg.Config, stateDB dbm.DB) *State { state.Save() } - // Transaction indexing - switch config.GetString("tx_indexer") { - case "kv": - store := dbm.NewDB("tx_indexer", config.GetString("db_backend"), config.GetString("db_dir")) - state.TxIndexer = txindexer.NewKV(store) - default: - state.TxIndexer = &txindexer.Null{} - } - return state } diff --git a/state/tx/indexer/null.go b/state/tx/indexer/null.go index b196a7056..c9df50edb 100644 --- a/state/tx/indexer/null.go +++ b/state/tx/indexer/null.go @@ -1,13 +1,16 @@ package indexer -import "github.com/tendermint/tendermint/types" +import ( + "errors" + "github.com/tendermint/tendermint/types" +) // Null acts as a /dev/null. type Null struct{} // Tx panics. func (indexer *Null) Tx(hash string) (*types.TxResult, error) { - panic("You are trying to get the transaction from a null indexer") + return nil, errors.New("Indexing is disabled (set `tx_indexer=kv` in config)") } // Batch returns nil. diff --git a/types/tx_result.go b/types/tx_result.go index c3ec00304..c1fc717d3 100644 --- a/types/tx_result.go +++ b/types/tx_result.go @@ -8,7 +8,7 @@ import ( // // One usage is indexing transaction results. type TxResult struct { - Height uint64 - Index uint32 - DeliverTxResponse abci.ResponseDeliverTx + Height uint64 `json:"height"` + Index uint32 `json:"index"` + DeliverTx abci.ResponseDeliverTx `json:"deliver_tx"` }