|
|
@ -21,7 +21,7 @@ import ( |
|
|
|
// CheckTx nor DeliverTx results.
|
|
|
|
// More: https://docs.tendermint.com/master/rpc/#/Tx/broadcast_tx_async
|
|
|
|
func BroadcastTxAsync(ctx *rpctypes.Context, tx types.Tx) (*ctypes.ResultBroadcastTx, error) { |
|
|
|
err := mempool.CheckTx(tx, nil, mempl.TxInfo{}) |
|
|
|
err := env.Mempool.CheckTx(tx, nil, mempl.TxInfo{}) |
|
|
|
|
|
|
|
if err != nil { |
|
|
|
return nil, err |
|
|
@ -34,7 +34,7 @@ func BroadcastTxAsync(ctx *rpctypes.Context, tx types.Tx) (*ctypes.ResultBroadca |
|
|
|
// More: https://docs.tendermint.com/master/rpc/#/Tx/broadcast_tx_sync
|
|
|
|
func BroadcastTxSync(ctx *rpctypes.Context, tx types.Tx) (*ctypes.ResultBroadcastTx, error) { |
|
|
|
resCh := make(chan *abci.Response, 1) |
|
|
|
err := mempool.CheckTx(tx, func(res *abci.Response) { |
|
|
|
err := env.Mempool.CheckTx(tx, func(res *abci.Response) { |
|
|
|
resCh <- res |
|
|
|
}, mempl.TxInfo{}) |
|
|
|
if err != nil { |
|
|
@ -56,31 +56,31 @@ func BroadcastTxSync(ctx *rpctypes.Context, tx types.Tx) (*ctypes.ResultBroadcas |
|
|
|
func BroadcastTxCommit(ctx *rpctypes.Context, tx types.Tx) (*ctypes.ResultBroadcastTxCommit, error) { |
|
|
|
subscriber := ctx.RemoteAddr() |
|
|
|
|
|
|
|
if eventBus.NumClients() >= config.MaxSubscriptionClients { |
|
|
|
return nil, fmt.Errorf("max_subscription_clients %d reached", config.MaxSubscriptionClients) |
|
|
|
} else if eventBus.NumClientSubscriptions(subscriber) >= config.MaxSubscriptionsPerClient { |
|
|
|
return nil, fmt.Errorf("max_subscriptions_per_client %d reached", config.MaxSubscriptionsPerClient) |
|
|
|
if env.EventBus.NumClients() >= env.Config.MaxSubscriptionClients { |
|
|
|
return nil, fmt.Errorf("max_subscription_clients %d reached", env.Config.MaxSubscriptionClients) |
|
|
|
} else if env.EventBus.NumClientSubscriptions(subscriber) >= env.Config.MaxSubscriptionsPerClient { |
|
|
|
return nil, fmt.Errorf("max_subscriptions_per_client %d reached", env.Config.MaxSubscriptionsPerClient) |
|
|
|
} |
|
|
|
|
|
|
|
// Subscribe to tx being committed in block.
|
|
|
|
subCtx, cancel := context.WithTimeout(ctx.Context(), SubscribeTimeout) |
|
|
|
defer cancel() |
|
|
|
q := types.EventQueryTxFor(tx) |
|
|
|
deliverTxSub, err := eventBus.Subscribe(subCtx, subscriber, q) |
|
|
|
deliverTxSub, err := env.EventBus.Subscribe(subCtx, subscriber, q) |
|
|
|
if err != nil { |
|
|
|
err = errors.Wrap(err, "failed to subscribe to tx") |
|
|
|
logger.Error("Error on broadcast_tx_commit", "err", err) |
|
|
|
err = fmt.Errorf("failed to subscribe to tx: %w", err) |
|
|
|
env.Logger.Error("Error on broadcast_tx_commit", "err", err) |
|
|
|
return nil, err |
|
|
|
} |
|
|
|
defer eventBus.Unsubscribe(context.Background(), subscriber, q) |
|
|
|
defer env.EventBus.Unsubscribe(context.Background(), subscriber, q) |
|
|
|
|
|
|
|
// Broadcast tx and wait for CheckTx result
|
|
|
|
checkTxResCh := make(chan *abci.Response, 1) |
|
|
|
err = mempool.CheckTx(tx, func(res *abci.Response) { |
|
|
|
err = env.Mempool.CheckTx(tx, func(res *abci.Response) { |
|
|
|
checkTxResCh <- res |
|
|
|
}, mempl.TxInfo{}) |
|
|
|
if err != nil { |
|
|
|
logger.Error("Error on broadcastTxCommit", "err", err) |
|
|
|
env.Logger.Error("Error on broadcastTxCommit", "err", err) |
|
|
|
return nil, fmt.Errorf("error on broadcastTxCommit: %v", err) |
|
|
|
} |
|
|
|
checkTxResMsg := <-checkTxResCh |
|
|
@ -111,15 +111,15 @@ func BroadcastTxCommit(ctx *rpctypes.Context, tx types.Tx) (*ctypes.ResultBroadc |
|
|
|
reason = deliverTxSub.Err().Error() |
|
|
|
} |
|
|
|
err = fmt.Errorf("deliverTxSub was cancelled (reason: %s)", reason) |
|
|
|
logger.Error("Error on broadcastTxCommit", "err", err) |
|
|
|
env.Logger.Error("Error on broadcastTxCommit", "err", err) |
|
|
|
return &ctypes.ResultBroadcastTxCommit{ |
|
|
|
CheckTx: *checkTxRes, |
|
|
|
DeliverTx: abci.ResponseDeliverTx{}, |
|
|
|
Hash: tx.Hash(), |
|
|
|
}, err |
|
|
|
case <-time.After(config.TimeoutBroadcastTxCommit): |
|
|
|
case <-time.After(env.Config.TimeoutBroadcastTxCommit): |
|
|
|
err = errors.New("timed out waiting for tx to be included in a block") |
|
|
|
logger.Error("Error on broadcastTxCommit", "err", err) |
|
|
|
env.Logger.Error("Error on broadcastTxCommit", "err", err) |
|
|
|
return &ctypes.ResultBroadcastTxCommit{ |
|
|
|
CheckTx: *checkTxRes, |
|
|
|
DeliverTx: abci.ResponseDeliverTx{}, |
|
|
@ -135,11 +135,11 @@ func UnconfirmedTxs(ctx *rpctypes.Context, limit int) (*ctypes.ResultUnconfirmed |
|
|
|
// reuse per_page validator
|
|
|
|
limit = validatePerPage(limit) |
|
|
|
|
|
|
|
txs := mempool.ReapMaxTxs(limit) |
|
|
|
txs := env.Mempool.ReapMaxTxs(limit) |
|
|
|
return &ctypes.ResultUnconfirmedTxs{ |
|
|
|
Count: len(txs), |
|
|
|
Total: mempool.Size(), |
|
|
|
TotalBytes: mempool.TxsBytes(), |
|
|
|
Total: env.Mempool.Size(), |
|
|
|
TotalBytes: env.Mempool.TxsBytes(), |
|
|
|
Txs: txs}, nil |
|
|
|
} |
|
|
|
|
|
|
@ -147,7 +147,7 @@ func UnconfirmedTxs(ctx *rpctypes.Context, limit int) (*ctypes.ResultUnconfirmed |
|
|
|
// More: https://docs.tendermint.com/master/rpc/#/Info/num_unconfirmed_txs
|
|
|
|
func NumUnconfirmedTxs(ctx *rpctypes.Context) (*ctypes.ResultUnconfirmedTxs, error) { |
|
|
|
return &ctypes.ResultUnconfirmedTxs{ |
|
|
|
Count: mempool.Size(), |
|
|
|
Total: mempool.Size(), |
|
|
|
TotalBytes: mempool.TxsBytes()}, nil |
|
|
|
Count: env.Mempool.Size(), |
|
|
|
Total: env.Mempool.Size(), |
|
|
|
TotalBytes: env.Mempool.TxsBytes()}, nil |
|
|
|
} |