@ -20,7 +20,7 @@ import (
// CheckTx nor DeliverTx results.
// CheckTx nor DeliverTx results.
// More: https://docs.tendermint.com/master/rpc/#/Tx/broadcast_tx_async
// More: https://docs.tendermint.com/master/rpc/#/Tx/broadcast_tx_async
func BroadcastTxAsync ( ctx * rpctypes . Context , tx types . Tx ) ( * ctypes . ResultBroadcastTx , error ) {
func BroadcastTxAsync ( ctx * rpctypes . Context , tx types . Tx ) ( * ctypes . ResultBroadcastTx , error ) {
err := m empool. CheckTx ( tx , nil , mempl . TxInfo { } )
err := env . M empool. CheckTx ( tx , nil , mempl . TxInfo { } )
if err != nil {
if err != nil {
return nil , err
return nil , err
@ -33,7 +33,7 @@ func BroadcastTxAsync(ctx *rpctypes.Context, tx types.Tx) (*ctypes.ResultBroadca
// More: https://docs.tendermint.com/master/rpc/#/Tx/broadcast_tx_sync
// More: https://docs.tendermint.com/master/rpc/#/Tx/broadcast_tx_sync
func BroadcastTxSync ( ctx * rpctypes . Context , tx types . Tx ) ( * ctypes . ResultBroadcastTx , error ) {
func BroadcastTxSync ( ctx * rpctypes . Context , tx types . Tx ) ( * ctypes . ResultBroadcastTx , error ) {
resCh := make ( chan * abci . Response , 1 )
resCh := make ( chan * abci . Response , 1 )
err := m empool. CheckTx ( tx , func ( res * abci . Response ) {
err := env . M empool. CheckTx ( tx , func ( res * abci . Response ) {
resCh <- res
resCh <- res
} , mempl . TxInfo { } )
} , mempl . TxInfo { } )
if err != nil {
if err != nil {
@ -55,31 +55,31 @@ func BroadcastTxSync(ctx *rpctypes.Context, tx types.Tx) (*ctypes.ResultBroadcas
func BroadcastTxCommit ( ctx * rpctypes . Context , tx types . Tx ) ( * ctypes . ResultBroadcastTxCommit , error ) {
func BroadcastTxCommit ( ctx * rpctypes . Context , tx types . Tx ) ( * ctypes . ResultBroadcastTxCommit , error ) {
subscriber := ctx . RemoteAddr ( )
subscriber := ctx . RemoteAddr ( )
if eventBus . NumClients ( ) >= c onfig. MaxSubscriptionClients {
return nil , fmt . Errorf ( "max_subscription_clients %d reached" , c onfig. MaxSubscriptionClients )
} else if eventBus . NumClientSubscriptions ( subscriber ) >= c onfig. MaxSubscriptionsPerClient {
return nil , fmt . Errorf ( "max_subscriptions_per_client %d reached" , c onfig. MaxSubscriptionsPerClient )
if env . E ventBus . NumClients ( ) >= env . C onfig. MaxSubscriptionClients {
return nil , fmt . Errorf ( "max_subscription_clients %d reached" , env . C onfig. MaxSubscriptionClients )
} else if env . E ventBus . NumClientSubscriptions ( subscriber ) >= env . C onfig. MaxSubscriptionsPerClient {
return nil , fmt . Errorf ( "max_subscriptions_per_client %d reached" , env . C onfig. MaxSubscriptionsPerClient )
}
}
// Subscribe to tx being committed in block.
// Subscribe to tx being committed in block.
subCtx , cancel := context . WithTimeout ( ctx . Context ( ) , SubscribeTimeout )
subCtx , cancel := context . WithTimeout ( ctx . Context ( ) , SubscribeTimeout )
defer cancel ( )
defer cancel ( )
q := types . EventQueryTxFor ( tx )
q := types . EventQueryTxFor ( tx )
deliverTxSub , err := eventBus . Subscribe ( subCtx , subscriber , q )
deliverTxSub , err := env . E ventBus . Subscribe ( subCtx , subscriber , q )
if err != nil {
if err != nil {
err = fmt . Errorf ( "failed to subscribe to tx: %w" , err )
err = fmt . Errorf ( "failed to subscribe to tx: %w" , err )
l ogger. Error ( "Error on broadcast_tx_commit" , "err" , err )
env . L ogger. Error ( "Error on broadcast_tx_commit" , "err" , err )
return nil , err
return nil , err
}
}
defer eventBus . Unsubscribe ( context . Background ( ) , subscriber , q )
defer env . E ventBus . Unsubscribe ( context . Background ( ) , subscriber , q )
// Broadcast tx and wait for CheckTx result
// Broadcast tx and wait for CheckTx result
checkTxResCh := make ( chan * abci . Response , 1 )
checkTxResCh := make ( chan * abci . Response , 1 )
err = m empool. CheckTx ( tx , func ( res * abci . Response ) {
err = env . M empool. CheckTx ( tx , func ( res * abci . Response ) {
checkTxResCh <- res
checkTxResCh <- res
} , mempl . TxInfo { } )
} , mempl . TxInfo { } )
if err != nil {
if err != nil {
l ogger. Error ( "Error on broadcastTxCommit" , "err" , err )
env . L ogger. Error ( "Error on broadcastTxCommit" , "err" , err )
return nil , fmt . Errorf ( "error on broadcastTxCommit: %v" , err )
return nil , fmt . Errorf ( "error on broadcastTxCommit: %v" , err )
}
}
checkTxResMsg := <- checkTxResCh
checkTxResMsg := <- checkTxResCh
@ -110,15 +110,15 @@ func BroadcastTxCommit(ctx *rpctypes.Context, tx types.Tx) (*ctypes.ResultBroadc
reason = deliverTxSub . Err ( ) . Error ( )
reason = deliverTxSub . Err ( ) . Error ( )
}
}
err = fmt . Errorf ( "deliverTxSub was cancelled (reason: %s)" , reason )
err = fmt . Errorf ( "deliverTxSub was cancelled (reason: %s)" , reason )
l ogger. Error ( "Error on broadcastTxCommit" , "err" , err )
env . L ogger. Error ( "Error on broadcastTxCommit" , "err" , err )
return & ctypes . ResultBroadcastTxCommit {
return & ctypes . ResultBroadcastTxCommit {
CheckTx : * checkTxRes ,
CheckTx : * checkTxRes ,
DeliverTx : abci . ResponseDeliverTx { } ,
DeliverTx : abci . ResponseDeliverTx { } ,
Hash : tx . Hash ( ) ,
Hash : tx . Hash ( ) ,
} , err
} , err
case <- time . After ( c onfig. TimeoutBroadcastTxCommit ) :
case <- time . After ( env . C onfig. TimeoutBroadcastTxCommit ) :
err = errors . New ( "timed out waiting for tx to be included in a block" )
err = errors . New ( "timed out waiting for tx to be included in a block" )
l ogger. Error ( "Error on broadcastTxCommit" , "err" , err )
env . L ogger. Error ( "Error on broadcastTxCommit" , "err" , err )
return & ctypes . ResultBroadcastTxCommit {
return & ctypes . ResultBroadcastTxCommit {
CheckTx : * checkTxRes ,
CheckTx : * checkTxRes ,
DeliverTx : abci . ResponseDeliverTx { } ,
DeliverTx : abci . ResponseDeliverTx { } ,
@ -134,11 +134,11 @@ func UnconfirmedTxs(ctx *rpctypes.Context, limit int) (*ctypes.ResultUnconfirmed
// reuse per_page validator
// reuse per_page validator
limit = validatePerPage ( limit )
limit = validatePerPage ( limit )
txs := m empool. ReapMaxTxs ( limit )
txs := env . M empool. ReapMaxTxs ( limit )
return & ctypes . ResultUnconfirmedTxs {
return & ctypes . ResultUnconfirmedTxs {
Count : len ( txs ) ,
Count : len ( txs ) ,
Total : m empool. Size ( ) ,
TotalBytes : m empool. TxsBytes ( ) ,
Total : env . M empool. Size ( ) ,
TotalBytes : env . M empool. TxsBytes ( ) ,
Txs : txs } , nil
Txs : txs } , nil
}
}
@ -146,7 +146,7 @@ func UnconfirmedTxs(ctx *rpctypes.Context, limit int) (*ctypes.ResultUnconfirmed
// More: https://docs.tendermint.com/master/rpc/#/Info/num_unconfirmed_txs
// More: https://docs.tendermint.com/master/rpc/#/Info/num_unconfirmed_txs
func NumUnconfirmedTxs ( ctx * rpctypes . Context ) ( * ctypes . ResultUnconfirmedTxs , error ) {
func NumUnconfirmedTxs ( ctx * rpctypes . Context ) ( * ctypes . ResultUnconfirmedTxs , error ) {
return & ctypes . ResultUnconfirmedTxs {
return & ctypes . ResultUnconfirmedTxs {
Count : m empool. Size ( ) ,
Total : m empool. Size ( ) ,
TotalBytes : m empool. TxsBytes ( ) } , nil
Count : env . M empool. Size ( ) ,
Total : env . M empool. Size ( ) ,
TotalBytes : env . M empool. TxsBytes ( ) } , nil
}
}