Browse Source

index by bytes. add TxID to broadcast_tx responses

pull/412/head
Ethan Buchman 8 years ago
parent
commit
d7c5690f17
10 changed files with 41 additions and 26 deletions
  1. +5
    -1
      rpc/core/mempool.go
  2. +3
    -11
      rpc/core/routes.go
  3. +13
    -0
      rpc/core/tx.go
  4. +8
    -1
      rpc/core/types/responses.go
  5. +1
    -2
      state/execution.go
  6. +1
    -1
      state/tx/indexer.go
  7. +3
    -3
      state/tx/indexer/batch.go
  8. +3
    -3
      state/tx/indexer/kv.go
  9. +2
    -2
      state/tx/indexer/kv_test.go
  10. +2
    -2
      state/tx/indexer/null.go

+ 5
- 1
rpc/core/mempool.go View File

@ -18,7 +18,7 @@ func BroadcastTxAsync(tx types.Tx) (*ctypes.ResultBroadcastTx, error) {
if err != nil {
return nil, fmt.Errorf("Error broadcasting transaction: %v", err)
}
return &ctypes.ResultBroadcastTx{}, nil
return &ctypes.ResultBroadcastTx{TxID: tx.Hash()}, nil
}
// Returns with the response from CheckTx
@ -36,6 +36,7 @@ func BroadcastTxSync(tx types.Tx) (*ctypes.ResultBroadcastTx, error) {
Code: r.Code,
Data: r.Data,
Log: r.Log,
TxID: tx.Hash(),
}, nil
}
@ -67,6 +68,7 @@ func BroadcastTxCommit(tx types.Tx) (*ctypes.ResultBroadcastTxCommit, error) {
return &ctypes.ResultBroadcastTxCommit{
CheckTx: checkTxR,
DeliverTx: nil,
TxID: tx.Hash(),
}, nil
}
@ -86,12 +88,14 @@ func BroadcastTxCommit(tx types.Tx) (*ctypes.ResultBroadcastTxCommit, error) {
return &ctypes.ResultBroadcastTxCommit{
CheckTx: checkTxR,
DeliverTx: deliverTxR,
TxID: tx.Hash(),
}, nil
case <-timer.C:
log.Error("failed to include tx")
return &ctypes.ResultBroadcastTxCommit{
CheckTx: checkTxR,
DeliverTx: nil,
TxID: tx.Hash(),
}, fmt.Errorf("Timed out waiting for transaction to be included in a block")
}


+ 3
- 11
rpc/core/routes.go View File

@ -1,12 +1,9 @@
package core
import (
"strings"
rpc "github.com/tendermint/go-rpc/server"
"github.com/tendermint/go-rpc/types"
ctypes "github.com/tendermint/tendermint/rpc/core/types"
"github.com/tendermint/tendermint/types"
)
// TODO: better system than "unsafe" prefix
@ -22,11 +19,11 @@ var Routes = map[string]*rpc.RPCFunc{
"genesis": rpc.NewRPCFunc(GenesisResult, ""),
"block": rpc.NewRPCFunc(BlockResult, "height"),
"commit": rpc.NewRPCFunc(CommitResult, "height"),
"tx": rpc.NewRPCFunc(TxResult, "hash"),
"validators": rpc.NewRPCFunc(ValidatorsResult, ""),
"dump_consensus_state": rpc.NewRPCFunc(DumpConsensusStateResult, ""),
"unconfirmed_txs": rpc.NewRPCFunc(UnconfirmedTxsResult, ""),
"num_unconfirmed_txs": rpc.NewRPCFunc(NumUnconfirmedTxsResult, ""),
"tx": rpc.NewRPCFunc(Tx, "hash"),
// broadcast API
"broadcast_tx_commit": rpc.NewRPCFunc(BroadcastTxCommitResult, "tx"),
@ -103,13 +100,8 @@ func NumUnconfirmedTxsResult() (ctypes.TMResult, error) {
// Tx allow user to query the transaction results. `nil` could mean the
// transaction is in the mempool, invalidated, or was not send in the first
// place.
func Tx(hash string) (ctypes.TMResult, error) {
r, err := txIndexer.Tx(strings.ToLower(hash))
// nil-pointer interface values are forbidden in go-rpc
if r == (*types.TxResult)(nil) {
return nil, err
}
return r, err
func TxResult(hash []byte) (ctypes.TMResult, error) {
return Tx(hash)
}
func BroadcastTxCommitResult(tx []byte) (ctypes.TMResult, error) {


+ 13
- 0
rpc/core/tx.go View File

@ -0,0 +1,13 @@
package core
import (
ctypes "github.com/tendermint/tendermint/rpc/core/types"
)
func Tx(hash []byte) (*ctypes.ResultTx, error) {
r, err := txIndexer.Tx(hash)
if err != nil {
return nil, err
}
return &ctypes.ResultTx{*r}, nil
}

+ 8
- 1
rpc/core/types/responses.go View File

@ -67,11 +67,18 @@ type ResultBroadcastTx struct {
Code abci.CodeType `json:"code"`
Data []byte `json:"data"`
Log string `json:"log"`
TxID []byte `json:"tx_id"`
}
type ResultBroadcastTxCommit struct {
CheckTx *abci.ResponseCheckTx `json:"check_tx"`
DeliverTx *abci.ResponseDeliverTx `json:"deliver_tx"`
TxID []byte `json:"tx_id"`
}
type ResultTx struct {
types.TxResult
}
type ResultUnconfirmedTxs struct {
@ -164,7 +171,7 @@ var _ = wire.RegisterInterface(
wire.ConcreteType{&ResultDumpConsensusState{}, ResultTypeDumpConsensusState},
wire.ConcreteType{&ResultBroadcastTx{}, ResultTypeBroadcastTx},
wire.ConcreteType{&ResultBroadcastTxCommit{}, ResultTypeBroadcastTxCommit},
wire.ConcreteType{&types.TxResult{}, ResultTypeTx},
wire.ConcreteType{&ResultTx{}, ResultTypeTx},
wire.ConcreteType{&ResultUnconfirmedTxs{}, ResultTypeUnconfirmedTxs},
wire.ConcreteType{&ResultSubscribe{}, ResultTypeSubscribe},
wire.ConcreteType{&ResultUnsubscribe{}, ResultTypeUnsubscribe},


+ 1
- 2
state/execution.go View File

@ -1,7 +1,6 @@
package state
import (
"encoding/hex"
"errors"
"fmt"
@ -246,7 +245,7 @@ func (s *State) ApplyBlock(eventCache types.Fireable, proxyAppConn proxy.AppConn
if r != nil {
tx := block.Txs[i]
// dd2e325f79f7e5f77788759d278c1d4b370c842e => {"height":2405, "index":0, ...}
batch.Index(hex.EncodeToString(tx.Hash()), *r)
batch.Index(tx.Hash(), *r)
}
}
s.TxIndexer.Batch(batch)


+ 1
- 1
state/tx/indexer.go View File

@ -17,5 +17,5 @@ type Indexer interface {
// Tx returns specified transaction or nil if the transaction is not indexed
// or stored.
Tx(hash string) (*types.TxResult, error)
Tx(hash []byte) (*types.TxResult, error)
}

+ 3
- 3
state/tx/indexer/batch.go View File

@ -18,11 +18,11 @@ func NewBatch() *Batch {
}
// Index adds or updates entry for the given hash.
func (b *Batch) Index(hash string, result types.TxResult) error {
if hash == "" {
func (b *Batch) Index(hash []byte, result types.TxResult) error {
if len(hash) == 0 {
return ErrorEmptyHash
}
b.Ops[hash] = result
b.Ops[string(hash)] = result
return nil
}


+ 3
- 3
state/tx/indexer/kv.go View File

@ -22,12 +22,12 @@ func NewKV(store db.DB) *KV {
// Tx gets transaction from the KV storage and returns it or nil if the
// transaction is not found.
func (indexer *KV) Tx(hash string) (*types.TxResult, error) {
if hash == "" {
func (indexer *KV) Tx(hash []byte) (*types.TxResult, error) {
if len(hash) == 0 {
return nil, ErrorEmptyHash
}
rawBytes := indexer.store.Get([]byte(hash))
rawBytes := indexer.store.Get(hash)
if rawBytes == nil {
return nil, nil
}


+ 2
- 2
state/tx/indexer/kv_test.go View File

@ -18,7 +18,7 @@ func TestKVIndex(t *testing.T) {
tx := types.Tx("HELLO WORLD")
txResult := &types.TxResult{1, 1, abci.ResponseDeliverTx{Data: []byte{0}, Code: abci.CodeType_OK, Log: ""}}
hash := string(tx.Hash())
hash := tx.Hash()
batch := NewBatch()
batch.Index(hash, *txResult)
@ -44,7 +44,7 @@ func benchmarkKVIndex(txsCount int, b *testing.B) {
batch := NewBatch()
for i := 0; i < txsCount; i++ {
batch.Index(fmt.Sprintf("hash%v", i), *txResult)
batch.Index([]byte(fmt.Sprintf("hash%v", i)), *txResult)
}
b.ResetTimer()


+ 2
- 2
state/tx/indexer/null.go View File

@ -9,8 +9,8 @@ import (
type Null struct{}
// Tx panics.
func (indexer *Null) Tx(hash string) (*types.TxResult, error) {
return nil, errors.New("Indexing is disabled (set `tx_indexer=kv` in config)")
func (indexer *Null) Tx(hash []byte) (*types.TxResult, error) {
return nil, errors.New(`Indexing is disabled (set 'tx_indexer = "kv"' in config)`)
}
// Batch returns nil.


Loading…
Cancel
Save