From 65ebc344ac26b8d7ca8b5e96fa9a6f6c180814db Mon Sep 17 00:00:00 2001 From: Ethan Buchman Date: Mon, 27 Jun 2016 20:43:09 -0400 Subject: [PATCH] broadcast_commit --- node/node.go | 1 + rpc/core/mempool.go | 70 +++++++++++++++++++++++++++++++++++++++++++-- rpc/core/pipe.go | 6 ++++ rpc/core/routes.go | 9 ++++++ state/execution.go | 8 ++++++ types/events.go | 2 ++ types/tx.go | 9 +++++- 7 files changed, 102 insertions(+), 3 deletions(-) diff --git a/node/node.go b/node/node.go index 3d2c91966..231b6b650 100644 --- a/node/node.go +++ b/node/node.go @@ -177,6 +177,7 @@ func (n *Node) AddListener(l p2p.Listener) { func (n *Node) StartRPC() ([]net.Listener, error) { rpccore.SetConfig(n.config) + rpccore.SetEventSwitch(n.evsw) rpccore.SetBlockStore(n.blockStore) rpccore.SetConsensusState(n.consensusState) rpccore.SetConsensusReactor(n.consensusReactor) diff --git a/rpc/core/mempool.go b/rpc/core/mempool.go index b0e5c0c43..f915ce426 100644 --- a/rpc/core/mempool.go +++ b/rpc/core/mempool.go @@ -2,14 +2,18 @@ package core import ( "fmt" + "time" + + "github.com/tendermint/go-events" ctypes "github.com/tendermint/tendermint/rpc/core/types" "github.com/tendermint/tendermint/types" tmsp "github.com/tendermint/tmsp/types" ) //----------------------------------------------------------------------------- +// NOTE: tx should be signed, but this is only checked at the app level (not by Tendermint!) -// NOTE: tx must be signed +// Returns right away, with no response func BroadcastTxAsync(tx types.Tx) (*ctypes.ResultBroadcastTx, error) { err := mempoolReactor.BroadcastTx(tx, nil) if err != nil { @@ -18,7 +22,7 @@ func BroadcastTxAsync(tx types.Tx) (*ctypes.ResultBroadcastTx, error) { return &ctypes.ResultBroadcastTx{}, nil } -// Note: tx must be signed +// Returns with the response from CheckTx func BroadcastTxSync(tx types.Tx) (*ctypes.ResultBroadcastTx, error) { resCh := make(chan *tmsp.Response, 1) err := mempoolReactor.BroadcastTx(tx, func(res *tmsp.Response) { @@ -36,6 +40,68 @@ func BroadcastTxSync(tx types.Tx) (*ctypes.ResultBroadcastTx, error) { }, nil } +// CONTRACT: returns error==nil iff the tx is included in a block. +// +// If CheckTx fails, return with the response from CheckTx AND an error. +// Else, block until the tx is included in a block, +// and return the result of AppendTx (with no error). +// Even if AppendTx fails, so long as the tx is included in a block this function +// will not return an error. +// The function times out after five minutes and returns the result of CheckTx and an error. +// TODO: smarter timeout logic or someway to cancel (tx not getting committed is a sign of a larger problem!) +func BroadcastTxCommit(tx types.Tx) (*ctypes.ResultBroadcastTx, error) { + + // subscribe to tx being committed in block + appendTxResCh := make(chan *tmsp.Response, 1) + eventSwitch.AddListenerForEvent("rpc", types.EventStringTx(tx), func(data events.EventData) { + appendTxResCh <- data.(*tmsp.Response) + }) + + // broadcast the tx and register checktx callback + checkTxResCh := make(chan *tmsp.Response, 1) + err := mempoolReactor.BroadcastTx(tx, func(res *tmsp.Response) { + checkTxResCh <- res + }) + if err != nil { + return nil, fmt.Errorf("Error broadcasting transaction: %v", err) + } + checkTxRes := <-checkTxResCh + checkTxR := checkTxRes.GetCheckTx() + if r := checkTxR; r.Code != tmsp.CodeType_OK { + // CheckTx failed! + return &ctypes.ResultBroadcastTx{ + Code: r.Code, + Data: r.Data, + Log: r.Log, + }, fmt.Errorf("Check tx failed with non-zero code: %s. Data: %X; Log: %s", r.Code.String(), r.Data, r.Log) + } + + // Wait for the tx to be included in a block, + // timeout after something reasonable. + timer := time.NewTimer(60 * 5 * time.Second) + select { + case appendTxRes := <-appendTxResCh: + // The tx was included in a block. + // NOTE we don't return an error regardless of the AppendTx code; + // clients must check this to see if they need to send a new tx! + r := appendTxRes.GetAppendTx() + return &ctypes.ResultBroadcastTx{ + Code: r.Code, + Data: r.Data, + Log: r.Log, + }, nil + case <-timer.C: + r := checkTxR + return &ctypes.ResultBroadcastTx{ + Code: r.Code, + Data: r.Data, + Log: r.Log, + }, fmt.Errorf("Timed out waiting for transaction to be included in a block") + } + + panic("Should never happen!") +} + func UnconfirmedTxs() (*ctypes.ResultUnconfirmedTxs, error) { txs := mempoolReactor.Mempool.Reap(0) return &ctypes.ResultUnconfirmedTxs{len(txs), txs}, nil diff --git a/rpc/core/pipe.go b/rpc/core/pipe.go index de196e813..464f7fda9 100644 --- a/rpc/core/pipe.go +++ b/rpc/core/pipe.go @@ -4,12 +4,14 @@ import ( cfg "github.com/tendermint/go-config" "github.com/tendermint/go-p2p" + "github.com/tendermint/go-events" bc "github.com/tendermint/tendermint/blockchain" "github.com/tendermint/tendermint/consensus" mempl "github.com/tendermint/tendermint/mempool" "github.com/tendermint/tendermint/types" ) +var eventSwitch *events.EventSwitch var blockStore *bc.BlockStore var consensusState *consensus.ConsensusState var consensusReactor *consensus.ConsensusReactor @@ -24,6 +26,10 @@ func SetConfig(c cfg.Config) { config = c } +func SetEventSwitch(evsw *events.EventSwitch) { + eventSwitch = evsw +} + func SetBlockStore(bs *bc.BlockStore) { blockStore = bs } diff --git a/rpc/core/routes.go b/rpc/core/routes.go index 585a18cd2..b8938f6bc 100644 --- a/rpc/core/routes.go +++ b/rpc/core/routes.go @@ -19,6 +19,7 @@ var Routes = map[string]*rpc.RPCFunc{ "block": rpc.NewRPCFunc(BlockResult, "height"), "validators": rpc.NewRPCFunc(ValidatorsResult, ""), "dump_consensus_state": rpc.NewRPCFunc(DumpConsensusStateResult, ""), + "broadcast_tx_commit": rpc.NewRPCFunc(BroadcastTxCommitResult, "tx"), "broadcast_tx_sync": rpc.NewRPCFunc(BroadcastTxSyncResult, "tx"), "broadcast_tx_async": rpc.NewRPCFunc(BroadcastTxAsyncResult, "tx"), "unconfirmed_txs": rpc.NewRPCFunc(UnconfirmedTxsResult, ""), @@ -126,6 +127,14 @@ func NumUnconfirmedTxsResult() (ctypes.TMResult, error) { } } +func BroadcastTxCommitResult(tx []byte) (ctypes.TMResult, error) { + if r, err := BroadcastTxCommit(tx); err != nil { + return nil, err + } else { + return r, nil + } +} + func BroadcastTxSyncResult(tx []byte) (ctypes.TMResult, error) { if r, err := BroadcastTxSync(tx); err != nil { return nil, err diff --git a/state/execution.go b/state/execution.go index 230bceb29..ad359f4d4 100644 --- a/state/execution.go +++ b/state/execution.go @@ -57,6 +57,8 @@ func (s *State) ExecBlock(evsw *events.EventSwitch, proxyAppConn proxy.AppConn, // TODO: Generate a bitmap or otherwise store tx validity in state. func (s *State) execBlockOnProxyApp(evsw *events.EventSwitch, proxyAppConn proxy.AppConn, block *types.Block) error { + eventCache := events.NewEventCache(evsw) + var validTxs, invalidTxs = 0, 0 // Execute transactions and get hash @@ -73,6 +75,9 @@ func (s *State) execBlockOnProxyApp(evsw *events.EventSwitch, proxyAppConn proxy log.Debug("Invalid tx", "code", r.AppendTx.Code, "log", r.AppendTx.Log) invalidTxs += 1 } + // NOTE: if we count we can access the tx from the block instead of + // pulling it from the req + eventCache.FireEvent(types.EventStringTx(req.GetAppendTx().Tx), res) } } proxyAppConn.SetResponseCallback(proxyCb) @@ -95,6 +100,9 @@ func (s *State) execBlockOnProxyApp(evsw *events.EventSwitch, proxyAppConn proxy log.Info("TODO: Do something with changedValidators", changedValidators) log.Info(Fmt("ExecBlock got %v valid txs and %v invalid txs", validTxs, invalidTxs)) + + // fire events + eventCache.Flush() return nil } diff --git a/types/events.go b/types/events.go index 3328911c1..68313ff25 100644 --- a/types/events.go +++ b/types/events.go @@ -2,6 +2,7 @@ package types import ( // for registering TMEventData as events.EventData + . "github.com/tendermint/go-common" "github.com/tendermint/go-events" "github.com/tendermint/go-wire" ) @@ -14,6 +15,7 @@ func EventStringUnbond() string { return "Unbond" } func EventStringRebond() string { return "Rebond" } func EventStringDupeout() string { return "Dupeout" } func EventStringFork() string { return "Fork" } +func EventStringTx(tx Tx) string { return Fmt("Tx:%X", tx.Hash()) } func EventStringNewBlock() string { return "NewBlock" } func EventStringNewBlockHeader() string { return "NewBlockHeader" } diff --git a/types/tx.go b/types/tx.go index 60699d534..40a042395 100644 --- a/types/tx.go +++ b/types/tx.go @@ -6,6 +6,13 @@ import ( type Tx []byte +// NOTE: this is the hash of the go-wire encoded Tx. +// Tx has no types at this level, so just length-prefixed. +// Maybe it should just be the hash of the bytes tho? +func (tx Tx) Hash() []byte { + return merkle.SimpleHashFromBinary(tx) +} + type Txs []Tx func (txs Txs) Hash() []byte { @@ -15,7 +22,7 @@ func (txs Txs) Hash() []byte { case 0: return nil case 1: - return merkle.SimpleHashFromBinary(txs[0]) + return txs[0].Hash() default: left := Txs(txs[:(len(txs)+1)/2]).Hash() right := Txs(txs[(len(txs)+1)/2:]).Hash()