diff --git a/CHANGELOG_PENDING.md b/CHANGELOG_PENDING.md index b11ced733..d84f9db26 100644 --- a/CHANGELOG_PENDING.md +++ b/CHANGELOG_PENDING.md @@ -26,6 +26,7 @@ Friendly reminder, we have a [bug bounty program](https://hackerone.com/tendermi - [mempool] [\#4759](https://github.com/tendermint/tendermint/pull/4759) Modify `Mempool#InitWAL` to return an error (@melekes) - [types] \#4798 Simplify `VerifyCommitTrusting` func + remove extra validation (@melekes) - [libs] \#4831 Remove `Bech32` pkg from Tendermint. This pkg now lives in the [cosmos-sdk](https://github.com/cosmos/cosmos-sdk/tree/4173ea5ebad906dd9b45325bed69b9c655504867/types/bech32) + - [node] [\#4832](https://github.com/tendermint/tendermint/pull/4832) `ConfigureRPC` returns an error (@melekes) - Blockchain Protocol diff --git a/lite/proxy/proxy.go b/lite/proxy/proxy.go index 5fb51f0b3..7def89a98 100644 --- a/lite/proxy/proxy.go +++ b/lite/proxy/proxy.go @@ -10,7 +10,6 @@ import ( "github.com/tendermint/tendermint/libs/log" "github.com/tendermint/tendermint/rpc/client" rpcclient "github.com/tendermint/tendermint/rpc/client" - "github.com/tendermint/tendermint/rpc/core" ctypes "github.com/tendermint/tendermint/rpc/core/types" rpcserver "github.com/tendermint/tendermint/rpc/lib/server" rpctypes "github.com/tendermint/tendermint/rpc/lib/types" @@ -46,7 +45,7 @@ func StartProxy(c rpcclient.Client, listenAddr string, logger log.Logger, maxOpe } wm := rpcserver.NewWebsocketManager(r, cdc, rpcserver.OnDisconnect(unsubscribeFromAllEvents)) wm.SetLogger(logger) - core.SetLogger(logger) + // core.SetLogger(logger) mux.HandleFunc(wsEndpoint, wm.WebsocketHandler) config := rpcserver.DefaultConfig() diff --git a/node/node.go b/node/node.go index f1e99aac0..cbc2a48fb 100644 --- a/node/node.go +++ b/node/node.go @@ -953,32 +953,42 @@ func (n *Node) OnStop() { } } -// ConfigureRPC sets all variables in rpccore so they will serve -// rpc calls from this node -func (n *Node) ConfigureRPC() { - rpccore.SetStateDB(n.stateDB) - rpccore.SetBlockStore(n.blockStore) - rpccore.SetConsensusState(n.consensusState) - rpccore.SetMempool(n.mempool) - rpccore.SetEvidencePool(n.evidencePool) - rpccore.SetP2PPeers(n.sw) - rpccore.SetP2PTransport(n) +// ConfigureRPC makes sure RPC has all the objects it needs to operate. +func (n *Node) ConfigureRPC() error { pubKey, err := n.privValidator.GetPubKey() if err != nil { - panic(err) + return fmt.Errorf("can't get pubkey: %w", err) } - rpccore.SetPubKey(pubKey) - rpccore.SetGenesisDoc(n.genesisDoc) - rpccore.SetProxyAppQuery(n.proxyApp.Query()) - rpccore.SetTxIndexer(n.txIndexer) - rpccore.SetConsensusReactor(n.consensusReactor) - rpccore.SetEventBus(n.eventBus) - rpccore.SetLogger(n.Logger.With("module", "rpc")) - rpccore.SetConfig(*n.config.RPC) + rpccore.SetEnvironment(&rpccore.Environment{ + ProxyAppQuery: n.proxyApp.Query(), + + StateDB: n.stateDB, + BlockStore: n.blockStore, + EvidencePool: n.evidencePool, + ConsensusState: n.consensusState, + P2PPeers: n.sw, + P2PTransport: n, + + PubKey: pubKey, + GenDoc: n.genesisDoc, + TxIndexer: n.txIndexer, + ConsensusReactor: n.consensusReactor, + EventBus: n.eventBus, + Mempool: n.mempool, + + Logger: n.Logger.With("module", "rpc"), + + Config: *n.config.RPC, + }) + return nil } func (n *Node) startRPC() ([]net.Listener, error) { - n.ConfigureRPC() + err := n.ConfigureRPC() + if err != nil { + return nil, err + } + listenAddrs := splitAndTrimEmpty(n.config.RPC.ListenAddress, ",", " ") coreCodec := amino.NewCodec() ctypes.RegisterAmino(coreCodec) diff --git a/rpc/core/abci.go b/rpc/core/abci.go index 8f135ba26..6682b6d99 100644 --- a/rpc/core/abci.go +++ b/rpc/core/abci.go @@ -17,7 +17,7 @@ func ABCIQuery( height int64, prove bool, ) (*ctypes.ResultABCIQuery, error) { - resQuery, err := proxyAppQuery.QuerySync(abci.RequestQuery{ + resQuery, err := env.ProxyAppQuery.QuerySync(abci.RequestQuery{ Path: path, Data: data, Height: height, @@ -26,14 +26,14 @@ func ABCIQuery( if err != nil { return nil, err } - logger.Info("ABCIQuery", "path", path, "data", data, "result", resQuery) + env.Logger.Info("ABCIQuery", "path", path, "data", data, "result", resQuery) return &ctypes.ResultABCIQuery{Response: *resQuery}, nil } // ABCIInfo gets some info about the application. // More: https://docs.tendermint.com/master/rpc/#/ABCI/abci_info func ABCIInfo(ctx *rpctypes.Context) (*ctypes.ResultABCIInfo, error) { - resInfo, err := proxyAppQuery.InfoSync(proxy.RequestInfo) + resInfo, err := env.ProxyAppQuery.InfoSync(proxy.RequestInfo) if err != nil { return nil, err } diff --git a/rpc/core/blocks.go b/rpc/core/blocks.go index f807bde7c..f27b1fe8e 100644 --- a/rpc/core/blocks.go +++ b/rpc/core/blocks.go @@ -17,20 +17,25 @@ func BlockchainInfo(ctx *rpctypes.Context, minHeight, maxHeight int64) (*ctypes. // maximum 20 block metas const limit int64 = 20 var err error - minHeight, maxHeight, err = filterMinMax(blockStore.Base(), blockStore.Height(), minHeight, maxHeight, limit) + minHeight, maxHeight, err = filterMinMax( + env.BlockStore.Base(), + env.BlockStore.Height(), + minHeight, + maxHeight, + limit) if err != nil { return nil, err } - logger.Debug("BlockchainInfoHandler", "maxHeight", maxHeight, "minHeight", minHeight) + env.Logger.Debug("BlockchainInfoHandler", "maxHeight", maxHeight, "minHeight", minHeight) blockMetas := []*types.BlockMeta{} for height := maxHeight; height >= minHeight; height-- { - blockMeta := blockStore.LoadBlockMeta(height) + blockMeta := env.BlockStore.LoadBlockMeta(height) blockMetas = append(blockMetas, blockMeta) } return &ctypes.ResultBlockchainInfo{ - LastHeight: blockStore.Height(), + LastHeight: env.BlockStore.Height(), BlockMetas: blockMetas}, nil } @@ -71,13 +76,13 @@ func filterMinMax(base, height, min, max, limit int64) (int64, int64, error) { // If no height is provided, it will fetch the latest block. // More: https://docs.tendermint.com/master/rpc/#/Info/block func Block(ctx *rpctypes.Context, heightPtr *int64) (*ctypes.ResultBlock, error) { - height, err := getHeight(blockStore.Base(), blockStore.Height(), heightPtr) + height, err := getHeight(env.BlockStore.Base(), env.BlockStore.Height(), heightPtr) if err != nil { return nil, err } - block := blockStore.LoadBlock(height) - blockMeta := blockStore.LoadBlockMeta(height) + block := env.BlockStore.LoadBlock(height) + blockMeta := env.BlockStore.LoadBlockMeta(height) if blockMeta == nil { return &ctypes.ResultBlock{BlockID: types.BlockID{}, Block: block}, nil } @@ -87,12 +92,12 @@ func Block(ctx *rpctypes.Context, heightPtr *int64) (*ctypes.ResultBlock, error) // BlockByHash gets block by hash. // More: https://docs.tendermint.com/master/rpc/#/Info/block_by_hash func BlockByHash(ctx *rpctypes.Context, hash []byte) (*ctypes.ResultBlock, error) { - block := blockStore.LoadBlockByHash(hash) + block := env.BlockStore.LoadBlockByHash(hash) if block == nil { return &ctypes.ResultBlock{BlockID: types.BlockID{}, Block: nil}, nil } // If block is not nil, then blockMeta can't be nil. - blockMeta := blockStore.LoadBlockMeta(block.Height) + blockMeta := env.BlockStore.LoadBlockMeta(block.Height) return &ctypes.ResultBlock{BlockID: blockMeta.BlockID, Block: block}, nil } @@ -100,12 +105,12 @@ func BlockByHash(ctx *rpctypes.Context, hash []byte) (*ctypes.ResultBlock, error // If no height is provided, it will fetch the commit for the latest block. // More: https://docs.tendermint.com/master/rpc/#/Info/commit func Commit(ctx *rpctypes.Context, heightPtr *int64) (*ctypes.ResultCommit, error) { - height, err := getHeight(blockStore.Base(), blockStore.Height(), heightPtr) + height, err := getHeight(env.BlockStore.Base(), env.BlockStore.Height(), heightPtr) if err != nil { return nil, err } - blockMeta := blockStore.LoadBlockMeta(height) + blockMeta := env.BlockStore.LoadBlockMeta(height) if blockMeta == nil { return nil, nil } @@ -113,13 +118,13 @@ func Commit(ctx *rpctypes.Context, heightPtr *int64) (*ctypes.ResultCommit, erro // If the next block has not been committed yet, // use a non-canonical commit - if height == blockStore.Height() { - commit := blockStore.LoadSeenCommit(height) + if height == env.BlockStore.Height() { + commit := env.BlockStore.LoadSeenCommit(height) return ctypes.NewResultCommit(&header, commit, false), nil } // Return the canonical commit (comes from the block at height+1) - commit := blockStore.LoadBlockCommit(height) + commit := env.BlockStore.LoadBlockCommit(height) return ctypes.NewResultCommit(&header, commit, true), nil } @@ -131,12 +136,12 @@ func Commit(ctx *rpctypes.Context, heightPtr *int64) (*ctypes.ResultCommit, erro // getBlock(h).Txs[5] // More: https://docs.tendermint.com/master/rpc/#/Info/block_results func BlockResults(ctx *rpctypes.Context, heightPtr *int64) (*ctypes.ResultBlockResults, error) { - height, err := getHeight(blockStore.Base(), blockStore.Height(), heightPtr) + height, err := getHeight(env.BlockStore.Base(), env.BlockStore.Height(), heightPtr) if err != nil { return nil, err } - results, err := sm.LoadABCIResponses(stateDB, height) + results, err := sm.LoadABCIResponses(env.StateDB, height) if err != nil { return nil, err } diff --git a/rpc/core/blocks_test.go b/rpc/core/blocks_test.go index c0561647f..be743a6a1 100644 --- a/rpc/core/blocks_test.go +++ b/rpc/core/blocks_test.go @@ -79,9 +79,10 @@ func TestBlockResults(t *testing.T) { BeginBlock: &abci.ResponseBeginBlock{}, } - stateDB = dbm.NewMemDB() - sm.SaveABCIResponses(stateDB, 100, results) - blockStore = mockBlockStore{height: 100} + env = &Environment{} + env.StateDB = dbm.NewMemDB() + sm.SaveABCIResponses(env.StateDB, 100, results) + env.BlockStore = mockBlockStore{height: 100} testCases := []struct { height int64 diff --git a/rpc/core/consensus.go b/rpc/core/consensus.go index 10717c8c6..14291e00c 100644 --- a/rpc/core/consensus.go +++ b/rpc/core/consensus.go @@ -19,13 +19,13 @@ import ( func Validators(ctx *rpctypes.Context, heightPtr *int64, page, perPage int) (*ctypes.ResultValidators, error) { // The latest validator that we know is the // NextValidator of the last block. - height := consensusState.GetState().LastBlockHeight + 1 - height, err := getHeight(blockStore.Base(), height, heightPtr) + height := env.ConsensusState.GetState().LastBlockHeight + 1 + height, err := getHeight(env.BlockStore.Base(), height, heightPtr) if err != nil { return nil, err } - validators, err := sm.LoadValidators(stateDB, height) + validators, err := sm.LoadValidators(env.StateDB, height) if err != nil { return nil, err } @@ -53,7 +53,7 @@ func Validators(ctx *rpctypes.Context, heightPtr *int64, page, perPage int) (*ct // More: https://docs.tendermint.com/master/rpc/#/Info/dump_consensus_state func DumpConsensusState(ctx *rpctypes.Context) (*ctypes.ResultDumpConsensusState, error) { // Get Peer consensus states. - peers := p2pPeers.Peers().List() + peers := env.P2PPeers.Peers().List() peerStates := make([]ctypes.PeerStateInfo, len(peers)) for i, peer := range peers { peerState, ok := peer.Get(types.PeerStateKey).(*cm.PeerState) @@ -72,7 +72,7 @@ func DumpConsensusState(ctx *rpctypes.Context) (*ctypes.ResultDumpConsensusState } } // Get self round state. - roundState, err := consensusState.GetRoundStateJSON() + roundState, err := env.ConsensusState.GetRoundStateJSON() if err != nil { return nil, err } @@ -86,7 +86,7 @@ func DumpConsensusState(ctx *rpctypes.Context) (*ctypes.ResultDumpConsensusState // More: https://docs.tendermint.com/master/rpc/#/Info/consensus_state func ConsensusState(ctx *rpctypes.Context) (*ctypes.ResultConsensusState, error) { // Get self round state. - bz, err := consensusState.GetRoundStateSimpleJSON() + bz, err := env.ConsensusState.GetRoundStateSimpleJSON() return &ctypes.ResultConsensusState{RoundState: bz}, err } @@ -94,13 +94,13 @@ func ConsensusState(ctx *rpctypes.Context) (*ctypes.ResultConsensusState, error) // If no height is provided, it will fetch the current consensus params. // More: https://docs.tendermint.com/master/rpc/#/Info/consensus_params func ConsensusParams(ctx *rpctypes.Context, heightPtr *int64) (*ctypes.ResultConsensusParams, error) { - height := consensusState.GetState().LastBlockHeight + 1 - height, err := getHeight(blockStore.Base(), height, heightPtr) + height := env.ConsensusState.GetState().LastBlockHeight + 1 + height, err := getHeight(env.BlockStore.Base(), height, heightPtr) if err != nil { return nil, err } - consensusparams, err := sm.LoadConsensusParams(stateDB, height) + consensusparams, err := sm.LoadConsensusParams(env.StateDB, height) if err != nil { return nil, err } diff --git a/rpc/core/dev.go b/rpc/core/dev.go index 71f284f89..0f05a9bbe 100644 --- a/rpc/core/dev.go +++ b/rpc/core/dev.go @@ -10,7 +10,7 @@ import ( // UnsafeFlushMempool removes all transactions from the mempool. func UnsafeFlushMempool(ctx *rpctypes.Context) (*ctypes.ResultUnsafeFlushMempool, error) { - mempool.Flush() + env.Mempool.Flush() return &ctypes.ResultUnsafeFlushMempool{}, nil } diff --git a/rpc/core/pipe.go b/rpc/core/env.go similarity index 59% rename from rpc/core/pipe.go rename to rpc/core/env.go index 4fb3b9b13..ab6114b67 100644 --- a/rpc/core/pipe.go +++ b/rpc/core/env.go @@ -28,6 +28,17 @@ const ( SubscribeTimeout = 5 * time.Second ) +var ( + // set by Node + env *Environment +) + +// SetEnvironment sets up the given Environment. +// It will race if multiple Node call SetEnvironment. +func SetEnvironment(e *Environment) { + env = e +} + //---------------------------------------------- // These interfaces are used by RPC and must be thread safe @@ -52,94 +63,34 @@ type peers interface { } //---------------------------------------------- -// These package level globals come with setters -// that are expected to be called only once, on startup - -var ( +// Environment contains objects and interfaces used by the RPC. It is expected +// to be setup once during startup. +type Environment struct { // external, thread safe interfaces - proxyAppQuery proxy.AppConnQuery + ProxyAppQuery proxy.AppConnQuery // interfaces defined in types and above - stateDB dbm.DB - blockStore sm.BlockStore - evidencePool sm.EvidencePool - consensusState Consensus - p2pPeers peers - p2pTransport transport + StateDB dbm.DB + BlockStore sm.BlockStore + EvidencePool sm.EvidencePool + ConsensusState Consensus + P2PPeers peers + P2PTransport transport // objects - pubKey crypto.PubKey - genDoc *types.GenesisDoc // cache the genesis structure - txIndexer txindex.TxIndexer - consensusReactor *consensus.Reactor - eventBus *types.EventBus // thread safe - mempool mempl.Mempool - - logger log.Logger - - config cfg.RPCConfig -) + PubKey crypto.PubKey + GenDoc *types.GenesisDoc // cache the genesis structure + TxIndexer txindex.TxIndexer + ConsensusReactor *consensus.Reactor + EventBus *types.EventBus // thread safe + Mempool mempl.Mempool -func SetStateDB(db dbm.DB) { - stateDB = db -} - -func SetBlockStore(bs sm.BlockStore) { - blockStore = bs -} - -func SetMempool(mem mempl.Mempool) { - mempool = mem -} - -func SetEvidencePool(evpool sm.EvidencePool) { - evidencePool = evpool -} - -func SetConsensusState(cs Consensus) { - consensusState = cs -} + Logger log.Logger -func SetP2PPeers(p peers) { - p2pPeers = p + Config cfg.RPCConfig } -func SetP2PTransport(t transport) { - p2pTransport = t -} - -func SetPubKey(pk crypto.PubKey) { - pubKey = pk -} - -func SetGenesisDoc(doc *types.GenesisDoc) { - genDoc = doc -} - -func SetProxyAppQuery(appConn proxy.AppConnQuery) { - proxyAppQuery = appConn -} - -func SetTxIndexer(indexer txindex.TxIndexer) { - txIndexer = indexer -} - -func SetConsensusReactor(conR *consensus.Reactor) { - consensusReactor = conR -} - -func SetLogger(l log.Logger) { - logger = l -} - -func SetEventBus(b *types.EventBus) { - eventBus = b -} - -// SetConfig sets an RPCConfig. -func SetConfig(c cfg.RPCConfig) { - config = c -} +//---------------------------------------------- func validatePage(page, perPage, totalCount int) (int, error) { if perPage < 1 { diff --git a/rpc/core/pipe_test.go b/rpc/core/env_test.go similarity index 99% rename from rpc/core/pipe_test.go rename to rpc/core/env_test.go index 93aff3e58..f9d408491 100644 --- a/rpc/core/pipe_test.go +++ b/rpc/core/env_test.go @@ -8,7 +8,6 @@ import ( ) func TestPaginationPage(t *testing.T) { - cases := []struct { totalCount int perPage int diff --git a/rpc/core/events.go b/rpc/core/events.go index 590037e00..f7a8fee03 100644 --- a/rpc/core/events.go +++ b/rpc/core/events.go @@ -20,13 +20,13 @@ const ( func Subscribe(ctx *rpctypes.Context, query string) (*ctypes.ResultSubscribe, error) { addr := ctx.RemoteAddr() - if eventBus.NumClients() >= config.MaxSubscriptionClients { - return nil, fmt.Errorf("max_subscription_clients %d reached", config.MaxSubscriptionClients) - } else if eventBus.NumClientSubscriptions(addr) >= config.MaxSubscriptionsPerClient { - return nil, fmt.Errorf("max_subscriptions_per_client %d reached", config.MaxSubscriptionsPerClient) + if env.EventBus.NumClients() >= env.Config.MaxSubscriptionClients { + return nil, fmt.Errorf("max_subscription_clients %d reached", env.Config.MaxSubscriptionClients) + } else if env.EventBus.NumClientSubscriptions(addr) >= env.Config.MaxSubscriptionsPerClient { + return nil, fmt.Errorf("max_subscriptions_per_client %d reached", env.Config.MaxSubscriptionsPerClient) } - logger.Info("Subscribe to query", "remote", addr, "query", query) + env.Logger.Info("Subscribe to query", "remote", addr, "query", query) q, err := tmquery.New(query) if err != nil { @@ -36,7 +36,7 @@ func Subscribe(ctx *rpctypes.Context, query string) (*ctypes.ResultSubscribe, er subCtx, cancel := context.WithTimeout(ctx.Context(), SubscribeTimeout) defer cancel() - sub, err := eventBus.Subscribe(subCtx, addr, q, subBufferSize) + sub, err := env.EventBus.Subscribe(subCtx, addr, q, subBufferSize) if err != nil { return nil, err } @@ -80,12 +80,12 @@ func Subscribe(ctx *rpctypes.Context, query string) (*ctypes.ResultSubscribe, er // More: https://docs.tendermint.com/master/rpc/#/Websocket/unsubscribe func Unsubscribe(ctx *rpctypes.Context, query string) (*ctypes.ResultUnsubscribe, error) { addr := ctx.RemoteAddr() - logger.Info("Unsubscribe from query", "remote", addr, "query", query) + env.Logger.Info("Unsubscribe from query", "remote", addr, "query", query) q, err := tmquery.New(query) if err != nil { return nil, fmt.Errorf("failed to parse query: %w", err) } - err = eventBus.Unsubscribe(context.Background(), addr, q) + err = env.EventBus.Unsubscribe(context.Background(), addr, q) if err != nil { return nil, err } @@ -96,8 +96,8 @@ func Unsubscribe(ctx *rpctypes.Context, query string) (*ctypes.ResultUnsubscribe // More: https://docs.tendermint.com/master/rpc/#/Websocket/unsubscribe_all func UnsubscribeAll(ctx *rpctypes.Context) (*ctypes.ResultUnsubscribe, error) { addr := ctx.RemoteAddr() - logger.Info("Unsubscribe from all", "remote", addr) - err := eventBus.UnsubscribeAll(context.Background(), addr) + env.Logger.Info("Unsubscribe from all", "remote", addr) + err := env.EventBus.UnsubscribeAll(context.Background(), addr) if err != nil { return nil, err } diff --git a/rpc/core/evidence.go b/rpc/core/evidence.go index 04cc68380..e559c9c8a 100644 --- a/rpc/core/evidence.go +++ b/rpc/core/evidence.go @@ -15,7 +15,7 @@ func BroadcastEvidence(ctx *rpctypes.Context, ev types.Evidence) (*ctypes.Result return nil, fmt.Errorf("evidence.ValidateBasic failed: %w", err) } - if err := evidencePool.AddEvidence(ev); err != nil { + if err := env.EvidencePool.AddEvidence(ev); err != nil { return nil, fmt.Errorf("failed to add evidence: %w", err) } return &ctypes.ResultBroadcastEvidence{Hash: ev.Hash()}, nil diff --git a/rpc/core/mempool.go b/rpc/core/mempool.go index 349cd1012..c8d6862cf 100644 --- a/rpc/core/mempool.go +++ b/rpc/core/mempool.go @@ -20,7 +20,7 @@ import ( // CheckTx nor DeliverTx results. // More: https://docs.tendermint.com/master/rpc/#/Tx/broadcast_tx_async func BroadcastTxAsync(ctx *rpctypes.Context, tx types.Tx) (*ctypes.ResultBroadcastTx, error) { - err := mempool.CheckTx(tx, nil, mempl.TxInfo{}) + err := env.Mempool.CheckTx(tx, nil, mempl.TxInfo{}) if err != nil { return nil, err @@ -33,7 +33,7 @@ func BroadcastTxAsync(ctx *rpctypes.Context, tx types.Tx) (*ctypes.ResultBroadca // More: https://docs.tendermint.com/master/rpc/#/Tx/broadcast_tx_sync func BroadcastTxSync(ctx *rpctypes.Context, tx types.Tx) (*ctypes.ResultBroadcastTx, error) { resCh := make(chan *abci.Response, 1) - err := mempool.CheckTx(tx, func(res *abci.Response) { + err := env.Mempool.CheckTx(tx, func(res *abci.Response) { resCh <- res }, mempl.TxInfo{}) if err != nil { @@ -55,31 +55,31 @@ func BroadcastTxSync(ctx *rpctypes.Context, tx types.Tx) (*ctypes.ResultBroadcas func BroadcastTxCommit(ctx *rpctypes.Context, tx types.Tx) (*ctypes.ResultBroadcastTxCommit, error) { subscriber := ctx.RemoteAddr() - if eventBus.NumClients() >= config.MaxSubscriptionClients { - return nil, fmt.Errorf("max_subscription_clients %d reached", config.MaxSubscriptionClients) - } else if eventBus.NumClientSubscriptions(subscriber) >= config.MaxSubscriptionsPerClient { - return nil, fmt.Errorf("max_subscriptions_per_client %d reached", config.MaxSubscriptionsPerClient) + if env.EventBus.NumClients() >= env.Config.MaxSubscriptionClients { + return nil, fmt.Errorf("max_subscription_clients %d reached", env.Config.MaxSubscriptionClients) + } else if env.EventBus.NumClientSubscriptions(subscriber) >= env.Config.MaxSubscriptionsPerClient { + return nil, fmt.Errorf("max_subscriptions_per_client %d reached", env.Config.MaxSubscriptionsPerClient) } // Subscribe to tx being committed in block. subCtx, cancel := context.WithTimeout(ctx.Context(), SubscribeTimeout) defer cancel() q := types.EventQueryTxFor(tx) - deliverTxSub, err := eventBus.Subscribe(subCtx, subscriber, q) + deliverTxSub, err := env.EventBus.Subscribe(subCtx, subscriber, q) if err != nil { err = fmt.Errorf("failed to subscribe to tx: %w", err) - logger.Error("Error on broadcast_tx_commit", "err", err) + env.Logger.Error("Error on broadcast_tx_commit", "err", err) return nil, err } - defer eventBus.Unsubscribe(context.Background(), subscriber, q) + defer env.EventBus.Unsubscribe(context.Background(), subscriber, q) // Broadcast tx and wait for CheckTx result checkTxResCh := make(chan *abci.Response, 1) - err = mempool.CheckTx(tx, func(res *abci.Response) { + err = env.Mempool.CheckTx(tx, func(res *abci.Response) { checkTxResCh <- res }, mempl.TxInfo{}) if err != nil { - logger.Error("Error on broadcastTxCommit", "err", err) + env.Logger.Error("Error on broadcastTxCommit", "err", err) return nil, fmt.Errorf("error on broadcastTxCommit: %v", err) } checkTxResMsg := <-checkTxResCh @@ -110,15 +110,15 @@ func BroadcastTxCommit(ctx *rpctypes.Context, tx types.Tx) (*ctypes.ResultBroadc reason = deliverTxSub.Err().Error() } err = fmt.Errorf("deliverTxSub was cancelled (reason: %s)", reason) - logger.Error("Error on broadcastTxCommit", "err", err) + env.Logger.Error("Error on broadcastTxCommit", "err", err) return &ctypes.ResultBroadcastTxCommit{ CheckTx: *checkTxRes, DeliverTx: abci.ResponseDeliverTx{}, Hash: tx.Hash(), }, err - case <-time.After(config.TimeoutBroadcastTxCommit): + case <-time.After(env.Config.TimeoutBroadcastTxCommit): err = errors.New("timed out waiting for tx to be included in a block") - logger.Error("Error on broadcastTxCommit", "err", err) + env.Logger.Error("Error on broadcastTxCommit", "err", err) return &ctypes.ResultBroadcastTxCommit{ CheckTx: *checkTxRes, DeliverTx: abci.ResponseDeliverTx{}, @@ -134,11 +134,11 @@ func UnconfirmedTxs(ctx *rpctypes.Context, limit int) (*ctypes.ResultUnconfirmed // reuse per_page validator limit = validatePerPage(limit) - txs := mempool.ReapMaxTxs(limit) + txs := env.Mempool.ReapMaxTxs(limit) return &ctypes.ResultUnconfirmedTxs{ Count: len(txs), - Total: mempool.Size(), - TotalBytes: mempool.TxsBytes(), + Total: env.Mempool.Size(), + TotalBytes: env.Mempool.TxsBytes(), Txs: txs}, nil } @@ -146,7 +146,7 @@ func UnconfirmedTxs(ctx *rpctypes.Context, limit int) (*ctypes.ResultUnconfirmed // More: https://docs.tendermint.com/master/rpc/#/Info/num_unconfirmed_txs func NumUnconfirmedTxs(ctx *rpctypes.Context) (*ctypes.ResultUnconfirmedTxs, error) { return &ctypes.ResultUnconfirmedTxs{ - Count: mempool.Size(), - Total: mempool.Size(), - TotalBytes: mempool.TxsBytes()}, nil + Count: env.Mempool.Size(), + Total: env.Mempool.Size(), + TotalBytes: env.Mempool.TxsBytes()}, nil } diff --git a/rpc/core/net.go b/rpc/core/net.go index 5c83a8eb8..2242d1709 100644 --- a/rpc/core/net.go +++ b/rpc/core/net.go @@ -12,7 +12,7 @@ import ( // NetInfo returns network info. // More: https://docs.tendermint.com/master/rpc/#/Info/net_info func NetInfo(ctx *rpctypes.Context) (*ctypes.ResultNetInfo, error) { - peersList := p2pPeers.Peers().List() + peersList := env.P2PPeers.Peers().List() peers := make([]ctypes.Peer, 0, len(peersList)) for _, peer := range peersList { nodeInfo, ok := peer.NodeInfo().(p2p.DefaultNodeInfo) @@ -30,8 +30,8 @@ func NetInfo(ctx *rpctypes.Context) (*ctypes.ResultNetInfo, error) { // PRO: useful info // CON: privacy return &ctypes.ResultNetInfo{ - Listening: p2pTransport.IsListening(), - Listeners: p2pTransport.Listeners(), + Listening: env.P2PTransport.IsListening(), + Listeners: env.P2PTransport.Listeners(), NPeers: len(peers), Peers: peers, }, nil @@ -42,8 +42,8 @@ func UnsafeDialSeeds(ctx *rpctypes.Context, seeds []string) (*ctypes.ResultDialS if len(seeds) == 0 { return &ctypes.ResultDialSeeds{}, errors.New("no seeds provided") } - logger.Info("DialSeeds", "seeds", seeds) - if err := p2pPeers.DialPeersAsync(seeds); err != nil { + env.Logger.Info("DialSeeds", "seeds", seeds) + if err := env.P2PPeers.DialPeersAsync(seeds); err != nil { return &ctypes.ResultDialSeeds{}, err } return &ctypes.ResultDialSeeds{Log: "Dialing seeds in progress. See /net_info for details"}, nil @@ -55,13 +55,13 @@ func UnsafeDialPeers(ctx *rpctypes.Context, peers []string, persistent bool) (*c if len(peers) == 0 { return &ctypes.ResultDialPeers{}, errors.New("no peers provided") } - logger.Info("DialPeers", "peers", peers, "persistent", persistent) + env.Logger.Info("DialPeers", "peers", peers, "persistent", persistent) if persistent { - if err := p2pPeers.AddPersistentPeers(peers); err != nil { + if err := env.P2PPeers.AddPersistentPeers(peers); err != nil { return &ctypes.ResultDialPeers{}, err } } - if err := p2pPeers.DialPeersAsync(peers); err != nil { + if err := env.P2PPeers.DialPeersAsync(peers); err != nil { return &ctypes.ResultDialPeers{}, err } return &ctypes.ResultDialPeers{Log: "Dialing peers in progress. See /net_info for details"}, nil @@ -70,5 +70,5 @@ func UnsafeDialPeers(ctx *rpctypes.Context, peers []string, persistent bool) (*c // Genesis returns genesis file. // More: https://docs.tendermint.com/master/rpc/#/Info/genesis func Genesis(ctx *rpctypes.Context) (*ctypes.ResultGenesis, error) { - return &ctypes.ResultGenesis{Genesis: genDoc}, nil + return &ctypes.ResultGenesis{Genesis: env.GenDoc}, nil } diff --git a/rpc/core/net_test.go b/rpc/core/net_test.go index 651e1f69d..59271fdc4 100644 --- a/rpc/core/net_test.go +++ b/rpc/core/net_test.go @@ -19,8 +19,8 @@ func TestUnsafeDialSeeds(t *testing.T) { require.NoError(t, err) defer sw.Stop() - logger = log.TestingLogger() - p2pPeers = sw + env.Logger = log.TestingLogger() + env.P2PPeers = sw testCases := []struct { seeds []string @@ -49,8 +49,8 @@ func TestUnsafeDialPeers(t *testing.T) { require.NoError(t, err) defer sw.Stop() - logger = log.TestingLogger() - p2pPeers = sw + env.Logger = log.TestingLogger() + env.P2PPeers = sw testCases := []struct { peers []string diff --git a/rpc/core/status.go b/rpc/core/status.go index 67c43ea0d..25a8eb51d 100644 --- a/rpc/core/status.go +++ b/rpc/core/status.go @@ -22,8 +22,8 @@ func Status(ctx *rpctypes.Context) (*ctypes.ResultStatus, error) { earliestAppHash tmbytes.HexBytes earliestBlockTimeNano int64 ) - earliestBlockHeight := blockStore.Base() - earliestBlockMeta = blockStore.LoadBlockMeta(earliestBlockHeight) + earliestBlockHeight := env.BlockStore.Base() + earliestBlockMeta = env.BlockStore.LoadBlockMeta(earliestBlockHeight) if earliestBlockMeta != nil { earliestAppHash = earliestBlockMeta.Header.AppHash earliestBlockHash = earliestBlockMeta.BlockID.Hash @@ -31,10 +31,10 @@ func Status(ctx *rpctypes.Context) (*ctypes.ResultStatus, error) { } var latestHeight int64 - if consensusReactor.WaitSync() { - latestHeight = blockStore.Height() + if env.ConsensusReactor.WaitSync() { + latestHeight = env.BlockStore.Height() } else { - latestHeight = consensusState.GetLastHeight() + latestHeight = env.ConsensusState.GetLastHeight() } var ( @@ -44,7 +44,7 @@ func Status(ctx *rpctypes.Context) (*ctypes.ResultStatus, error) { latestBlockTimeNano int64 ) if latestHeight != 0 { - latestBlockMeta = blockStore.LoadBlockMeta(latestHeight) + latestBlockMeta = env.BlockStore.LoadBlockMeta(latestHeight) latestBlockHash = latestBlockMeta.BlockID.Hash latestAppHash = latestBlockMeta.Header.AppHash latestBlockTimeNano = latestBlockMeta.Header.Time.UnixNano() @@ -56,7 +56,7 @@ func Status(ctx *rpctypes.Context) (*ctypes.ResultStatus, error) { } result := &ctypes.ResultStatus{ - NodeInfo: p2pTransport.NodeInfo().(p2p.DefaultNodeInfo), + NodeInfo: env.P2PTransport.NodeInfo().(p2p.DefaultNodeInfo), SyncInfo: ctypes.SyncInfo{ LatestBlockHash: latestBlockHash, LatestAppHash: latestAppHash, @@ -66,11 +66,11 @@ func Status(ctx *rpctypes.Context) (*ctypes.ResultStatus, error) { EarliestAppHash: earliestAppHash, EarliestBlockHeight: earliestBlockHeight, EarliestBlockTime: time.Unix(0, earliestBlockTimeNano), - CatchingUp: consensusReactor.WaitSync(), + CatchingUp: env.ConsensusReactor.WaitSync(), }, ValidatorInfo: ctypes.ValidatorInfo{ - Address: pubKey.Address(), - PubKey: pubKey, + Address: env.PubKey.Address(), + PubKey: env.PubKey, VotingPower: votingPower, }, } @@ -79,10 +79,10 @@ func Status(ctx *rpctypes.Context) (*ctypes.ResultStatus, error) { } func validatorAtHeight(h int64) *types.Validator { - privValAddress := pubKey.Address() + privValAddress := env.PubKey.Address() // If we're still at height h, search in the current validator set. - lastBlockHeight, vals := consensusState.GetValidators() + lastBlockHeight, vals := env.ConsensusState.GetValidators() if lastBlockHeight == h { for _, val := range vals { if bytes.Equal(val.Address, privValAddress) { @@ -93,7 +93,7 @@ func validatorAtHeight(h int64) *types.Validator { // If we've moved to the next height, retrieve the validator set from DB. if lastBlockHeight > h { - vals, err := sm.LoadValidators(stateDB, h) + vals, err := sm.LoadValidators(env.StateDB, h) if err != nil { return nil // should not happen } diff --git a/rpc/core/tx.go b/rpc/core/tx.go index 5a8c68443..9fb549b3b 100644 --- a/rpc/core/tx.go +++ b/rpc/core/tx.go @@ -19,11 +19,11 @@ import ( // More: https://docs.tendermint.com/master/rpc/#/Info/tx func Tx(ctx *rpctypes.Context, hash []byte, prove bool) (*ctypes.ResultTx, error) { // if index is disabled, return error - if _, ok := txIndexer.(*null.TxIndex); ok { + if _, ok := env.TxIndexer.(*null.TxIndex); ok { return nil, fmt.Errorf("transaction indexing is disabled") } - r, err := txIndexer.Get(hash) + r, err := env.TxIndexer.Get(hash) if err != nil { return nil, err } @@ -37,7 +37,7 @@ func Tx(ctx *rpctypes.Context, hash []byte, prove bool) (*ctypes.ResultTx, error var proof types.TxProof if prove { - block := blockStore.LoadBlock(height) + block := env.BlockStore.LoadBlock(height) proof = block.Data.Txs.Proof(int(index)) // XXX: overflow on 32-bit machines } @@ -57,7 +57,7 @@ func Tx(ctx *rpctypes.Context, hash []byte, prove bool) (*ctypes.ResultTx, error func TxSearch(ctx *rpctypes.Context, query string, prove bool, page, perPage int, orderBy string) ( *ctypes.ResultTxSearch, error) { // if index is disabled, return error - if _, ok := txIndexer.(*null.TxIndex); ok { + if _, ok := env.TxIndexer.(*null.TxIndex); ok { return nil, errors.New("transaction indexing is disabled") } @@ -66,7 +66,7 @@ func TxSearch(ctx *rpctypes.Context, query string, prove bool, page, perPage int return nil, err } - results, err := txIndexer.Search(ctx.Context(), q) + results, err := env.TxIndexer.Search(ctx.Context(), q) if err != nil { return nil, err } @@ -107,7 +107,7 @@ func TxSearch(ctx *rpctypes.Context, query string, prove bool, page, perPage int var proof types.TxProof if prove { - block := blockStore.LoadBlock(r.Height) + block := env.BlockStore.LoadBlock(r.Height) proof = block.Data.Txs.Proof(int(r.Index)) // XXX: overflow on 32-bit machines }