From 63ff2f052d1706b35937d747cce73d59d5b0883e Mon Sep 17 00:00:00 2001 From: "M. J. Fromberger" Date: Thu, 3 Mar 2022 10:03:38 -0800 Subject: [PATCH 1/4] Remove now-unused and deprecated Subscribe methods. (#8064) Both pubsub and eventbus are internal packages now, and all the existing use has been updated to use the SubscribeWithArgs methods instead. --- internal/eventbus/event_bus.go | 7 ------- internal/pubsub/pubsub.go | 20 -------------------- 2 files changed, 27 deletions(-) diff --git a/internal/eventbus/event_bus.go b/internal/eventbus/event_bus.go index 2a7c032b3..1d2d510e3 100644 --- a/internal/eventbus/event_bus.go +++ b/internal/eventbus/event_bus.go @@ -50,13 +50,6 @@ func (b *EventBus) NumClientSubscriptions(clientID string) int { return b.pubsub.NumClientSubscriptions(clientID) } -// Deprecated: Use SubscribeWithArgs instead. -func (b *EventBus) Subscribe(ctx context.Context, - clientID string, query *tmquery.Query, capacities ...int) (Subscription, error) { - - return b.pubsub.Subscribe(ctx, clientID, query, capacities...) -} - func (b *EventBus) SubscribeWithArgs(ctx context.Context, args tmpubsub.SubscribeArgs) (Subscription, error) { return b.pubsub.SubscribeWithArgs(ctx, args) } diff --git a/internal/pubsub/pubsub.go b/internal/pubsub/pubsub.go index 707f9cb13..df2dd90e3 100644 --- a/internal/pubsub/pubsub.go +++ b/internal/pubsub/pubsub.go @@ -153,26 +153,6 @@ func BufferCapacity(cap int) Option { // BufferCapacity returns capacity of the publication queue. func (s *Server) BufferCapacity() int { return cap(s.queue) } -// Subscribe creates a subscription for the given client ID and query. -// If len(capacities) > 0, its first value is used as the queue capacity. -// -// Deprecated: Use SubscribeWithArgs. This method will be removed in v0.36. -func (s *Server) Subscribe(ctx context.Context, clientID string, query *query.Query, capacities ...int) (*Subscription, error) { - args := SubscribeArgs{ - ClientID: clientID, - Query: query, - Limit: 1, - } - if len(capacities) > 0 { - args.Limit = capacities[0] - if len(capacities) > 1 { - args.Quota = capacities[1] - } - // bounds are checked below - } - return s.SubscribeWithArgs(ctx, args) -} - // Observe registers an observer function that will be called synchronously // with each published message matching any of the given queries, prior to it // being forwarded to any subscriber. If no queries are specified, all From 9d984848453aa7f90460cac4ce4b89fe52eb94ed Mon Sep 17 00:00:00 2001 From: Sam Kleinman Date: Thu, 3 Mar 2022 15:17:45 -0500 Subject: [PATCH 2/4] node: excise node handle within rpc env (#8063) --- internal/rpc/core/env.go | 15 +++++++-------- internal/rpc/core/net.go | 4 ++-- internal/rpc/core/status.go | 2 +- node/node.go | 23 +++-------------------- 4 files changed, 13 insertions(+), 31 deletions(-) diff --git a/internal/rpc/core/env.go b/internal/rpc/core/env.go index 5a718b232..8f590298b 100644 --- a/internal/rpc/core/env.go +++ b/internal/rpc/core/env.go @@ -57,12 +57,6 @@ type consensusState interface { GetRoundStateSimpleJSON() ([]byte, error) } -type transport interface { - Listeners() []string - IsListening() bool - NodeInfo() types.NodeInfo -} - type peerManager interface { Peers() []types.NodeID Addresses(types.NodeID) []p2p.NodeAddress @@ -84,8 +78,9 @@ type Environment struct { ConsensusReactor *consensus.Reactor BlockSyncReactor *blocksync.Reactor - // Legacy p2p stack - P2PTransport transport + IsListening bool + Listeners []string + NodeInfo types.NodeInfo // interfaces for new p2p interfaces PeerManager peerManager @@ -226,6 +221,10 @@ func (env *Environment) StartService(ctx context.Context, conf *config.Config) ( return nil, err } + env.Listeners = []string{ + fmt.Sprintf("Listener(@%v)", conf.P2P.ExternalAddress), + } + listenAddrs := strings.SplitAndTrimEmpty(conf.RPC.ListenAddress, ",", " ") routes := NewRoutesMap(env, &RouteOptions{ Unsafe: conf.RPC.Unsafe, diff --git a/internal/rpc/core/net.go b/internal/rpc/core/net.go index 3cead393c..5444b77b7 100644 --- a/internal/rpc/core/net.go +++ b/internal/rpc/core/net.go @@ -27,8 +27,8 @@ func (env *Environment) NetInfo(ctx context.Context) (*coretypes.ResultNetInfo, } return &coretypes.ResultNetInfo{ - Listening: env.P2PTransport.IsListening(), - Listeners: env.P2PTransport.Listeners(), + Listening: env.IsListening, + Listeners: env.Listeners, NPeers: len(peers), Peers: peers, }, nil diff --git a/internal/rpc/core/status.go b/internal/rpc/core/status.go index 2f648978a..46b8a6fcd 100644 --- a/internal/rpc/core/status.go +++ b/internal/rpc/core/status.go @@ -66,7 +66,7 @@ func (env *Environment) Status(ctx context.Context) (*coretypes.ResultStatus, er } result := &coretypes.ResultStatus{ - NodeInfo: env.P2PTransport.NodeInfo(), + NodeInfo: env.NodeInfo, ApplicationInfo: applicationInfo, SyncInfo: coretypes.SyncInfo{ LatestBlockHash: latestBlockHash, diff --git a/node/node.go b/node/node.go index 7d7b75170..8970a29d7 100644 --- a/node/node.go +++ b/node/node.go @@ -58,7 +58,6 @@ type nodeImpl struct { router *p2p.Router nodeInfo types.NodeInfo nodeKey types.NodeKey // our node privkey - isListening bool // services eventSinks []indexer.EventSink @@ -421,8 +420,6 @@ func makeNode( node.rpcEnv.PubKey = pubKey } - node.rpcEnv.P2PTransport = node - node.BaseService = *service.NewBaseService(logger, "Node", node) return node, nil @@ -467,6 +464,7 @@ func (n *nodeImpl) OnStart(ctx context.Context) error { } } + n.rpcEnv.NodeInfo = n.nodeInfo // Start the RPC server before the P2P server // so we can eg. receive txs for the first block if n.config.RPC.ListenAddress != "" { @@ -485,7 +483,7 @@ func (n *nodeImpl) OnStart(ctx context.Context) error { if err := n.router.Start(ctx); err != nil { return err } - n.isListening = true + n.rpcEnv.IsListening = true for _, reactor := range n.services { if err := reactor.Start(ctx); err != nil { @@ -580,7 +578,7 @@ func (n *nodeImpl) OnStop() { n.stateSyncReactor.Wait() n.router.Wait() - n.isListening = false + n.rpcEnv.IsListening = false // finally stop the listeners / external services for _, l := range n.rpcListeners { @@ -669,21 +667,6 @@ func (n *nodeImpl) RPCEnvironment() *rpccore.Environment { //------------------------------------------------------------------------------ -func (n *nodeImpl) Listeners() []string { - return []string{ - fmt.Sprintf("Listener(@%v)", n.config.P2P.ExternalAddress), - } -} - -func (n *nodeImpl) IsListening() bool { - return n.isListening -} - -// NodeInfo returns the Node's Info from the Switch. -func (n *nodeImpl) NodeInfo() types.NodeInfo { - return n.nodeInfo -} - // genesisDocProvider returns a GenesisDoc. // It allows the GenesisDoc to be pulled from sources other than the // filesystem, for instance from a distributed key-value store cluster. From c8c248d7336bb4b087cc8b7e8c8aa8f31d2b0b5f Mon Sep 17 00:00:00 2001 From: William Banfield <4561443+williambanfield@users.noreply.github.com> Date: Thu, 3 Mar 2022 17:25:06 -0500 Subject: [PATCH 3/4] docs: add an overview of the proposer-based timestamps algorithm (#8058) This change adds an overview of the proposer-based timestamps algorithm. The goal of this documentation is to give a plain enough explanation of the algorithm so that application developers and validators can understand both the utility of the algorithm and understand how the new constrains may affect their network and topology. I'm blanking on the scheme we decided on for docs linking, so if anyone could remind me what link format we decided on, I'll go clean that up ASAP. Once this is merged, I intend to create a runbook for chains that see slower block-times or higher nil prevotes and link that runbook to this document to provide a higher-level overview. closes: #8046 --- .../consensus/proposer-based-timestamps.md | 94 +++++++++++++++++++ spec/abci/apps.md | 13 +++ 2 files changed, 107 insertions(+) create mode 100644 docs/tendermint-core/consensus/proposer-based-timestamps.md diff --git a/docs/tendermint-core/consensus/proposer-based-timestamps.md b/docs/tendermint-core/consensus/proposer-based-timestamps.md new file mode 100644 index 000000000..def42dc20 --- /dev/null +++ b/docs/tendermint-core/consensus/proposer-based-timestamps.md @@ -0,0 +1,94 @@ +--- order: 3 --- + +# PBTS + + This document provides an overview of the Proposer-Based Timestamp (PBTS) + algorithm added to Tendermint in the v0.36 release. It outlines the core + functionality as well as the parameters and constraints of the this algorithm. + +## Algorithm Overview + +The PBTS algorithm defines a way for a Tendermint blockchain to create block +timestamps that are within a reasonable bound of the clocks of the validators on +the network. This replaces the original BFTTime algorithm for timestamp +assignment that relied on the timestamps included in precommit messages. + +## Algorithm Parameters + +The functionality of the PBTS algorithm is governed by two parameters within +Tendermint. These two parameters are [consensus +parameters](https://github.com/tendermint/tendermint/blob/master/spec/abci/apps.md#L291), +meaning they are configured by the ABCI application and are expected to be the +same across all nodes on the network. + +### `Precision` + +The `Precision` parameter configures the acceptable upper-bound of clock drift +among all of the nodes on a Tendermint network. Any two nodes on a Tendermint +network are expected to have clocks that differ by at most `Precision` +milliseconds any given instant. + +### `MessageDelay` + +The `MessageDelay` parameter configures the acceptable upper-bound for +transmitting a `Proposal` message from the proposer to _all_ of the validators +on the network. + +Networks should choose as small a value for `MessageDelay` as is practical, +provided it is large enough that messages can reach all participants with high +probability given the number of participants and latency of their connections. + +## Algorithm Concepts + +### Block timestamps + +Each block produced by the Tendermint consensus engine contains a timestamp. +The timestamp produced in each block is a meaningful representation of time that is +useful for the protocols and applications built on top of Tendermint. + +The following protocols and application features require a reliable source of time: + +* Tendermint Light Clients [rely on correspondence between their known time](https://github.com/tendermint/tendermint/blob/master/spec/light-client/verification/README.md#definitions-1) and the block time for block verification. +* Tendermint Evidence validity is determined [either in terms of heights or in terms of time](https://github.com/tendermint/tendermint/blob/master/spec/consensus/evidence.md#verification). +* Unbonding of staked assets in the Cosmos Hub [occurs after a period of 21 + days](https://github.com/cosmos/governance/blob/master/params-change/Staking.md#unbondingtime). +* IBC packets can use either a [timestamp or a height to timeout packet + delivery](https://docs.cosmos.network/v0.44/ibc/overview.html#acknowledgements) + +### Proposer Selects a Block Timestamp + +When the proposer node creates a new block proposal, the node reads the time +from its local clock and uses this reading as the timestamp for the proposed +block. + +### Timeliness + +When each validator on a Tendermint network receives a proposed block, it +performs a series of checks to ensure that the block can be considered valid as +a candidate to be the next block in the chain. + +The PBTS algorithm performs a validity check on the timestamp of proposed +blocks. When a validator receives a proposal it ensures that the timestamp in +the proposal is within a bound of the validator's local clock. Specifically, the +algorithm checks that the timestamp is no more than `Precision` greater than the +node's local clock and no less than `Precision` + `MessageDelay` behind than the +node's local clock. This creates range of acceptable timestamps around the +node's local time. If the timestamp is within this range, the PBTS algorithm +considers the block **timely**. If a block is not **timely**, the node will +issue a `nil` `prevote` for this block, signaling to the rest of the network +that the node does not consider the block to be valid. + +### Clock Synchronization + +The PBTS algorithm requires the clocks of the validators on a Tendermint network +are within `Precision` of each other. In practice, this means that validators +should periodically synchronize to a reliable NTP server. Validators that drift +too far away from the rest of the network will no longer propose blocks with +valid timestamps. Additionally they will not view the timestamps of blocks +proposed by their peers to be valid either. + +## See Also + +* [The PBTS specification](https://github.com/tendermint/tendermint/blob/master/spec/consensus/proposer-based-timestamp/README.md) + contains all of the details of the algorithm. + diff --git a/spec/abci/apps.md b/spec/abci/apps.md index 030a3d3c3..d6ec19832 100644 --- a/spec/abci/apps.md +++ b/spec/abci/apps.md @@ -346,6 +346,19 @@ a block minus it's overhead ( ~ `MaxBytes`). Must have `MaxNum > 0`. +### SynchronyParams.Precision + +`SynchronyParams.Precision` is a parameter of the Proposer-Based Timestamps algorithm. +that configures the acceptable upper-bound of clock drift among +all of the nodes on a Tendermint network. Any two nodes on a Tendermint network +are expected to have clocks that differ by at most `Precision`. + +### SynchronyParams.MessageDelay + +`SynchronyParams.MessageDelay` is a parameter of the Proposer-Based Timestamps +algorithm that configures the acceptable upper-bound for transmitting a `Proposal` +message from the proposer to all of the validators on the network. + ### Updates The application may set the ConsensusParams during InitChain, and update them during From 0167f0d527d363e86ec27a58d9b7b4be56132b1b Mon Sep 17 00:00:00 2001 From: Sam Kleinman Date: Fri, 4 Mar 2022 12:23:57 -0500 Subject: [PATCH 4/4] node: nodes should fetch state on startup (#8062) --- internal/blocksync/reactor.go | 44 ++++++++------- internal/blocksync/reactor_test.go | 2 +- internal/consensus/byzantine_test.go | 32 +++++++---- internal/consensus/common_test.go | 9 ++- internal/consensus/reactor_test.go | 7 ++- internal/consensus/replay_file.go | 29 ++++++---- internal/consensus/state.go | 45 ++++++++++++--- internal/consensus/wal_generator.go | 6 +- internal/mempool/mempool.go | 3 +- internal/mempool/mempool_test.go | 2 +- internal/state/execution.go | 4 +- internal/state/tx_filter.go | 83 ++++++++++++++++++++++++---- internal/state/tx_filter_test.go | 2 +- node/node.go | 34 ++++++------ node/node_test.go | 3 - node/setup.go | 20 ++++--- test/fuzz/mempool/checktx.go | 1 - 17 files changed, 221 insertions(+), 105 deletions(-) diff --git a/internal/blocksync/reactor.go b/internal/blocksync/reactor.go index f4d69b8b0..cf1a10623 100644 --- a/internal/blocksync/reactor.go +++ b/internal/blocksync/reactor.go @@ -70,6 +70,8 @@ type Reactor struct { // immutable initialState sm.State + // store + stateStore sm.Store blockExec *sm.BlockExecutor store *store.BlockStore @@ -101,7 +103,7 @@ type Reactor struct { func NewReactor( ctx context.Context, logger log.Logger, - state sm.State, + stateStore sm.Store, blockExec *sm.BlockExecutor, store *store.BlockStore, consReactor consensusReactor, @@ -111,19 +113,6 @@ func NewReactor( metrics *consensus.Metrics, eventBus *eventbus.EventBus, ) (*Reactor, error) { - - if state.LastBlockHeight != store.Height() { - return nil, fmt.Errorf("state (%v) and store (%v) height mismatch", state.LastBlockHeight, store.Height()) - } - - startHeight := store.Height() + 1 - if startHeight == 1 { - startHeight = state.InitialHeight - } - - requestsCh := make(chan BlockRequest, maxTotalRequesters) - errorsCh := make(chan peerError, maxPeerErrBuffer) // NOTE: The capacity should be larger than the peer count. - blockSyncCh, err := channelCreator(ctx, GetChannelDescriptor()) if err != nil { return nil, err @@ -131,20 +120,16 @@ func NewReactor( r := &Reactor{ logger: logger, - initialState: state, + stateStore: stateStore, blockExec: blockExec, store: store, - pool: NewBlockPool(logger, startHeight, requestsCh, errorsCh), consReactor: consReactor, blockSync: newAtomicBool(blockSync), - requestsCh: requestsCh, - errorsCh: errorsCh, blockSyncCh: blockSyncCh, blockSyncOutBridgeCh: make(chan p2p.Envelope), peerUpdates: peerUpdates, metrics: metrics, eventBus: eventBus, - syncStartTime: time.Time{}, } r.BaseService = *service.NewBaseService(logger, "BlockSync", r) @@ -159,6 +144,27 @@ func NewReactor( // If blockSync is enabled, we also start the pool and the pool processing // goroutine. If the pool fails to start, an error is returned. func (r *Reactor) OnStart(ctx context.Context) error { + state, err := r.stateStore.Load() + if err != nil { + return err + } + r.initialState = state + + if state.LastBlockHeight != r.store.Height() { + return fmt.Errorf("state (%v) and store (%v) height mismatch", state.LastBlockHeight, r.store.Height()) + } + + startHeight := r.store.Height() + 1 + if startHeight == 1 { + startHeight = state.InitialHeight + } + + requestsCh := make(chan BlockRequest, maxTotalRequesters) + errorsCh := make(chan peerError, maxPeerErrBuffer) // NOTE: The capacity should be larger than the peer count. + r.pool = NewBlockPool(r.logger, startHeight, requestsCh, errorsCh) + r.requestsCh = requestsCh + r.errorsCh = errorsCh + if r.blockSync.IsSet() { if err := r.pool.Start(ctx); err != nil { return err diff --git a/internal/blocksync/reactor_test.go b/internal/blocksync/reactor_test.go index 68656fbc3..00ab14a86 100644 --- a/internal/blocksync/reactor_test.go +++ b/internal/blocksync/reactor_test.go @@ -176,7 +176,7 @@ func (rts *reactorTestSuite) addNode( rts.reactors[nodeID], err = NewReactor( ctx, rts.logger.With("nodeID", nodeID), - state.Copy(), + stateStore, blockExec, blockStore, nil, diff --git a/internal/consensus/byzantine_test.go b/internal/consensus/byzantine_test.go index 33e1dbf63..f0df502f9 100644 --- a/internal/consensus/byzantine_test.go +++ b/internal/consensus/byzantine_test.go @@ -82,7 +82,6 @@ func TestByzantinePrevoteEquivocation(t *testing.T) { log.TestingLogger().With("module", "mempool"), thisConfig.Mempool, proxyAppConnMem, - 0, ) if thisConfig.Consensus.WaitForTxs() { mempool.EnableTxsAvailable() @@ -95,7 +94,8 @@ func TestByzantinePrevoteEquivocation(t *testing.T) { // Make State blockExec := sm.NewBlockExecutor(stateStore, log.TestingLogger(), proxyAppConnCon, mempool, evpool, blockStore) - cs := NewState(ctx, logger, thisConfig.Consensus, state, blockExec, blockStore, mempool, evpool) + cs, err := NewState(ctx, logger, thisConfig.Consensus, stateStore, blockExec, blockStore, mempool, evpool) + require.NoError(t, err) // set private validator pv := privVals[i] cs.SetPrivValidator(ctx, pv) @@ -105,14 +105,13 @@ func TestByzantinePrevoteEquivocation(t *testing.T) { require.NoError(t, err) cs.SetEventBus(eventBus) evpool.SetEventBus(eventBus) - cs.SetTimeoutTicker(tickerFunc()) states[i] = cs }() } - rts := setup(ctx, t, nValidators, states, 100) // buffer must be large enough to not deadlock + rts := setup(ctx, t, nValidators, states, 512) // buffer must be large enough to not deadlock var bzNodeID types.NodeID @@ -238,8 +237,7 @@ func TestByzantinePrevoteEquivocation(t *testing.T) { } for _, reactor := range rts.reactors { - state := reactor.state.GetState() - reactor.SwitchToConsensus(ctx, state, false) + reactor.SwitchToConsensus(ctx, reactor.state.GetState(), false) } // Evidence should be submitted and committed at the third height but @@ -248,20 +246,26 @@ func TestByzantinePrevoteEquivocation(t *testing.T) { var wg sync.WaitGroup i := 0 + subctx, subcancel := context.WithCancel(ctx) + defer subcancel() for _, sub := range rts.subs { wg.Add(1) go func(j int, s eventbus.Subscription) { defer wg.Done() for { - if ctx.Err() != nil { + if subctx.Err() != nil { + return + } + + msg, err := s.Next(subctx) + if subctx.Err() != nil { return } - msg, err := s.Next(ctx) - assert.NoError(t, err) if err != nil { - cancel() + t.Errorf("waiting for subscription: %v", err) + subcancel() return } @@ -273,12 +277,18 @@ func TestByzantinePrevoteEquivocation(t *testing.T) { } } }(i, sub) - i++ } wg.Wait() + // don't run more assertions if we've encountered a timeout + select { + case <-subctx.Done(): + t.Fatal("encountered timeout") + default: + } + pubkey, err := bzNodeState.privValidator.GetPubKey(ctx) require.NoError(t, err) diff --git a/internal/consensus/common_test.go b/internal/consensus/common_test.go index bd381ff7e..4a6b96b2d 100644 --- a/internal/consensus/common_test.go +++ b/internal/consensus/common_test.go @@ -469,7 +469,6 @@ func newStateWithConfigAndBlockStore( logger.With("module", "mempool"), thisConfig.Mempool, proxyAppConnMem, - 0, ) if thisConfig.Consensus.WaitForTxs() { @@ -484,15 +483,19 @@ func newStateWithConfigAndBlockStore( require.NoError(t, stateStore.Save(state)) blockExec := sm.NewBlockExecutor(stateStore, logger, proxyAppConnCon, mempool, evpool, blockStore) - cs := NewState(ctx, + cs, err := NewState(ctx, logger.With("module", "consensus"), thisConfig.Consensus, - state, + stateStore, blockExec, blockStore, mempool, evpool, ) + if err != nil { + t.Fatal(err) + } + cs.SetPrivValidator(ctx, pv) eventBus := eventbus.NewDefault(logger.With("module", "events")) diff --git a/internal/consensus/reactor_test.go b/internal/consensus/reactor_test.go index f01d013b3..ea9238a22 100644 --- a/internal/consensus/reactor_test.go +++ b/internal/consensus/reactor_test.go @@ -461,6 +461,7 @@ func TestReactorWithEvidence(t *testing.T) { stateStore := sm.NewStore(stateDB) state, err := sm.MakeGenesisState(genDoc) require.NoError(t, err) + require.NoError(t, stateStore.Save(state)) thisConfig, err := ResetConfig(t.TempDir(), fmt.Sprintf("%s_%d", testName, i)) require.NoError(t, err) @@ -483,7 +484,6 @@ func TestReactorWithEvidence(t *testing.T) { log.TestingLogger().With("module", "mempool"), thisConfig.Mempool, proxyAppConnMem, - 0, ) if thisConfig.Consensus.WaitForTxs() { @@ -506,8 +506,9 @@ func TestReactorWithEvidence(t *testing.T) { blockExec := sm.NewBlockExecutor(stateStore, log.TestingLogger(), proxyAppConnCon, mempool, evpool, blockStore) - cs := NewState(ctx, logger.With("validator", i, "module", "consensus"), - thisConfig.Consensus, state, blockExec, blockStore, mempool, evpool2) + cs, err := NewState(ctx, logger.With("validator", i, "module", "consensus"), + thisConfig.Consensus, stateStore, blockExec, blockStore, mempool, evpool2) + require.NoError(t, err) cs.SetPrivValidator(ctx, pv) eventBus := eventbus.NewDefault(log.TestingLogger().With("module", "events")) diff --git a/internal/consensus/replay_file.go b/internal/consensus/replay_file.go index 310eb0ab6..96de5ef28 100644 --- a/internal/consensus/replay_file.go +++ b/internal/consensus/replay_file.go @@ -84,7 +84,7 @@ func (cs *State) ReplayFile(ctx context.Context, file string, console bool) erro return err } - pb := newPlayback(file, fp, cs, cs.state.Copy()) + pb := newPlayback(file, fp, cs, cs.stateStore) defer pb.fp.Close() var nextN int // apply N msgs in a row @@ -126,17 +126,17 @@ type playback struct { count int // how many lines/msgs into the file are we // replays can be reset to beginning - fileName string // so we can close/reopen the file - genesisState sm.State // so the replay session knows where to restart from + fileName string // so we can close/reopen the file + stateStore sm.Store } -func newPlayback(fileName string, fp *os.File, cs *State, genState sm.State) *playback { +func newPlayback(fileName string, fp *os.File, cs *State, store sm.Store) *playback { return &playback{ - cs: cs, - fp: fp, - fileName: fileName, - genesisState: genState, - dec: NewWALDecoder(fp), + cs: cs, + fp: fp, + fileName: fileName, + stateStore: store, + dec: NewWALDecoder(fp), } } @@ -145,8 +145,11 @@ func (pb *playback) replayReset(ctx context.Context, count int, newStepSub event pb.cs.Stop() pb.cs.Wait() - newCS := NewState(ctx, pb.cs.logger, pb.cs.config, pb.genesisState.Copy(), pb.cs.blockExec, + newCS, err := NewState(ctx, pb.cs.logger, pb.cs.config, pb.stateStore, pb.cs.blockExec, pb.cs.blockStore, pb.cs.txNotifier, pb.cs.evpool) + if err != nil { + return err + } newCS.SetEventBus(pb.cs.eventBus) newCS.startForReplay() @@ -345,9 +348,11 @@ func newConsensusStateForReplay( mempool, evpool := emptyMempool{}, sm.EmptyEvidencePool{} blockExec := sm.NewBlockExecutor(stateStore, logger, proxyApp.Consensus(), mempool, evpool, blockStore) - consensusState := NewState(ctx, logger, csConfig, state.Copy(), blockExec, + consensusState, err := NewState(ctx, logger, csConfig, stateStore, blockExec, blockStore, mempool, evpool) - + if err != nil { + return nil, err + } consensusState.SetEventBus(eventBus) return consensusState, nil } diff --git a/internal/consensus/state.go b/internal/consensus/state.go index 7f2045dcd..220cc0741 100644 --- a/internal/consensus/state.go +++ b/internal/consensus/state.go @@ -121,6 +121,9 @@ type State struct { // store blocks and commits blockStore sm.BlockStore + stateStore sm.Store + initialStatePopulated bool + // create and execute blocks blockExec *sm.BlockExecutor @@ -189,18 +192,19 @@ func NewState( ctx context.Context, logger log.Logger, cfg *config.ConsensusConfig, - state sm.State, + store sm.Store, blockExec *sm.BlockExecutor, blockStore sm.BlockStore, txNotifier txNotifier, evpool evidencePool, options ...StateOption, -) *State { +) (*State, error) { cs := &State{ logger: logger, config: cfg, blockExec: blockExec, blockStore: blockStore, + stateStore: store, txNotifier: txNotifier, peerMsgQueue: make(chan msgInfo, msgQueueSize), internalMsgQueue: make(chan msgInfo, msgQueueSize), @@ -220,21 +224,40 @@ func NewState( cs.doPrevote = cs.defaultDoPrevote cs.setProposal = cs.defaultSetProposal - // We have no votes, so reconstruct LastCommit from SeenCommit. - if state.LastBlockHeight > 0 { - cs.reconstructLastCommit(state) + if err := cs.updateStateFromStore(ctx); err != nil { + return nil, err } - cs.updateToState(ctx, state) - // NOTE: we do not call scheduleRound0 yet, we do that upon Start() - cs.BaseService = *service.NewBaseService(logger, "State", cs) for _, option := range options { option(cs) } - return cs + return cs, nil +} + +func (cs *State) updateStateFromStore(ctx context.Context) error { + if cs.initialStatePopulated { + return nil + } + state, err := cs.stateStore.Load() + if err != nil { + return fmt.Errorf("loading state: %w", err) + } + if state.IsEmpty() { + return nil + } + + // We have no votes, so reconstruct LastCommit from SeenCommit. + if state.LastBlockHeight > 0 { + cs.reconstructLastCommit(state) + } + + cs.updateToState(ctx, state) + + cs.initialStatePopulated = true + return nil } // SetEventBus sets event bus. @@ -365,6 +388,10 @@ func (cs *State) LoadCommit(height int64) *types.Commit { // OnStart loads the latest state via the WAL, and starts the timeout and // receive routines. func (cs *State) OnStart(ctx context.Context) error { + if err := cs.updateStateFromStore(ctx); err != nil { + return err + } + // We may set the WAL in testing before calling Start, so only OpenWAL if its // still the nilWAL. if _, ok := cs.wal.(nilWAL); ok { diff --git a/internal/consensus/wal_generator.go b/internal/consensus/wal_generator.go index b10feb828..493ec1840 100644 --- a/internal/consensus/wal_generator.go +++ b/internal/consensus/wal_generator.go @@ -83,7 +83,11 @@ func WALGenerateNBlocks(ctx context.Context, t *testing.T, logger log.Logger, wr mempool := emptyMempool{} evpool := sm.EmptyEvidencePool{} blockExec := sm.NewBlockExecutor(stateStore, log.TestingLogger(), proxyApp.Consensus(), mempool, evpool, blockStore) - consensusState := NewState(ctx, logger, cfg.Consensus, state.Copy(), blockExec, blockStore, mempool, evpool) + consensusState, err := NewState(ctx, logger, cfg.Consensus, stateStore, blockExec, blockStore, mempool, evpool) + if err != nil { + t.Fatal(err) + } + consensusState.SetEventBus(eventBus) if privValidator != nil && privValidator != (*privval.FilePV)(nil) { consensusState.SetPrivValidator(ctx, privValidator) diff --git a/internal/mempool/mempool.go b/internal/mempool/mempool.go index 21429721d..26f039de4 100644 --- a/internal/mempool/mempool.go +++ b/internal/mempool/mempool.go @@ -94,7 +94,6 @@ func NewTxMempool( logger log.Logger, cfg *config.MempoolConfig, proxyAppConn proxy.AppConnMempool, - height int64, options ...TxMempoolOption, ) *TxMempool { @@ -102,7 +101,7 @@ func NewTxMempool( logger: logger, config: cfg, proxyAppConn: proxyAppConn, - height: height, + height: -1, cache: NopTxCache{}, metrics: NopMetrics(), txStore: NewTxStore(), diff --git a/internal/mempool/mempool_test.go b/internal/mempool/mempool_test.go index e2cf12e07..68eb5731b 100644 --- a/internal/mempool/mempool_test.go +++ b/internal/mempool/mempool_test.go @@ -95,7 +95,7 @@ func setup(ctx context.Context, t testing.TB, cacheSize int, options ...TxMempoo appConnMem.Wait() }) - return NewTxMempool(logger.With("test", t.Name()), cfg.Mempool, appConnMem, 0, options...) + return NewTxMempool(logger.With("test", t.Name()), cfg.Mempool, appConnMem, options...) } func checkTxs(ctx context.Context, t *testing.T, txmp *TxMempool, numTxs int, peerID uint16) []testTx { diff --git a/internal/state/execution.go b/internal/state/execution.go index cdd6e009b..c67d9795a 100644 --- a/internal/state/execution.go +++ b/internal/state/execution.go @@ -360,8 +360,8 @@ func (blockExec *BlockExecutor) Commit( block.Height, block.Txs, deliverTxResponses, - TxPreCheck(state), - TxPostCheck(state), + TxPreCheckForState(state), + TxPostCheckForState(state), ) return res.Data, res.RetainHeight, err diff --git a/internal/state/tx_filter.go b/internal/state/tx_filter.go index 871e08ae6..11dd9ce67 100644 --- a/internal/state/tx_filter.go +++ b/internal/state/tx_filter.go @@ -1,22 +1,85 @@ package state import ( + "sync" + "time" + + abci "github.com/tendermint/tendermint/abci/types" "github.com/tendermint/tendermint/internal/mempool" "github.com/tendermint/tendermint/types" ) -// TxPreCheck returns a function to filter transactions before processing. -// The function limits the size of a transaction to the block's maximum data size. -func TxPreCheck(state State) mempool.PreCheckFunc { - maxDataBytes := types.MaxDataBytesNoEvidence( - state.ConsensusParams.Block.MaxBytes, - state.Validators.Size(), +func cachingStateFetcher(store Store) func() (State, error) { + const ttl = time.Second + + var ( + last time.Time + mutex = &sync.Mutex{} + cache State + err error ) - return mempool.PreCheckMaxBytes(maxDataBytes) + + return func() (State, error) { + mutex.Lock() + defer mutex.Unlock() + + if time.Since(last) < ttl && cache.ChainID != "" { + return cache, nil + } + + cache, err = store.Load() + if err != nil { + return State{}, err + } + last = time.Now() + + return cache, nil + } + } -// TxPostCheck returns a function to filter transactions after processing. +// TxPreCheckFromStore returns a function to filter transactions before processing. +// The function limits the size of a transaction to the block's maximum data size. +func TxPreCheckFromStore(store Store) mempool.PreCheckFunc { + fetch := cachingStateFetcher(store) + + return func(tx types.Tx) error { + state, err := fetch() + if err != nil { + return err + } + + return TxPreCheckForState(state)(tx) + } +} + +func TxPreCheckForState(state State) mempool.PreCheckFunc { + return func(tx types.Tx) error { + maxDataBytes := types.MaxDataBytesNoEvidence( + state.ConsensusParams.Block.MaxBytes, + state.Validators.Size(), + ) + return mempool.PreCheckMaxBytes(maxDataBytes)(tx) + } + +} + +// TxPostCheckFromStore returns a function to filter transactions after processing. // The function limits the gas wanted by a transaction to the block's maximum total gas. -func TxPostCheck(state State) mempool.PostCheckFunc { - return mempool.PostCheckMaxGas(state.ConsensusParams.Block.MaxGas) +func TxPostCheckFromStore(store Store) mempool.PostCheckFunc { + fetch := cachingStateFetcher(store) + + return func(tx types.Tx, resp *abci.ResponseCheckTx) error { + state, err := fetch() + if err != nil { + return err + } + return mempool.PostCheckMaxGas(state.ConsensusParams.Block.MaxGas)(tx, resp) + } +} + +func TxPostCheckForState(state State) mempool.PostCheckFunc { + return func(tx types.Tx, resp *abci.ResponseCheckTx) error { + return mempool.PostCheckMaxGas(state.ConsensusParams.Block.MaxGas)(tx, resp) + } } diff --git a/internal/state/tx_filter_test.go b/internal/state/tx_filter_test.go index 27af28a40..ac85543b2 100644 --- a/internal/state/tx_filter_test.go +++ b/internal/state/tx_filter_test.go @@ -31,7 +31,7 @@ func TestTxFilter(t *testing.T) { state, err := sm.MakeGenesisState(genDoc) require.NoError(t, err) - f := sm.TxPreCheck(state) + f := sm.TxPreCheckForState(state) if tc.isErr { assert.NotNil(t, f(tc.tx), "#%v", i) } else { diff --git a/node/node.go b/node/node.go index 8970a29d7..3b30a2853 100644 --- a/node/node.go +++ b/node/node.go @@ -143,11 +143,8 @@ func makeNode( return nil, combineCloseError(err, makeCloser(closers)) } - err = genDoc.ValidateAndComplete() - if err != nil { - return nil, combineCloseError( - fmt.Errorf("error in genesis doc: %w", err), - makeCloser(closers)) + if err = genDoc.ValidateAndComplete(); err != nil { + return nil, combineCloseError(fmt.Errorf("error in genesis doc: %w", err), makeCloser(closers)) } state, err := loadStateFromDBOrGenesisDocProvider(stateStore, genDoc) @@ -241,10 +238,6 @@ func makeNode( } } - // Determine whether we should do block sync. This must happen after the handshake, since the - // app may modify the validator set, specifying ourself as the only validator. - blockSync := !onlyValidatorIsUs(state, pubKey) - logNodeStartupInfo(state, pubKey, logger, cfg.Mode) // TODO: Fetch and provide real options and do proper p2p bootstrapping. @@ -271,14 +264,14 @@ func makeNode( } mpReactor, mp, err := createMempoolReactor(ctx, - cfg, proxyApp, state, nodeMetrics.mempool, peerManager, router, logger, + cfg, proxyApp, stateStore, nodeMetrics.mempool, peerManager, router, logger, ) if err != nil { return nil, combineCloseError(err, makeCloser(closers)) } evReactor, evPool, err := createEvidenceReactor(ctx, - cfg, dbProvider, stateDB, blockStore, peerManager, router, logger, nodeMetrics.evidence, eventBus, + cfg, dbProvider, stateStore, blockStore, peerManager, router, logger, nodeMetrics.evidence, eventBus, ) if err != nil { return nil, combineCloseError(err, makeCloser(closers)) @@ -295,8 +288,12 @@ func makeNode( sm.BlockExecutorWithMetrics(nodeMetrics.state), ) + // Determine whether we should do block sync. This must happen after the handshake, since the + // app may modify the validator set, specifying ourself as the only validator. + blockSync := !onlyValidatorIsUs(state, pubKey) + csReactor, csState, err := createConsensusReactor(ctx, - cfg, state, blockExec, blockStore, mp, evPool, + cfg, stateStore, blockExec, blockStore, mp, evPool, privValidator, nodeMetrics.consensus, stateSync || blockSync, eventBus, peerManager, router, logger, ) @@ -308,7 +305,7 @@ func makeNode( // doing a state sync first. bcReactor, err := blocksync.NewReactor(ctx, logger.With("module", "blockchain"), - state.Copy(), + stateStore, blockExec, blockStore, csReactor, @@ -730,10 +727,7 @@ func defaultMetricsProvider(cfg *config.InstrumentationConfig) metricsProvider { // loadStateFromDBOrGenesisDocProvider attempts to load the state from the // database, or creates one using the given genesisDocProvider. On success this also // returns the genesis doc loaded through the given provider. -func loadStateFromDBOrGenesisDocProvider( - stateStore sm.Store, - genDoc *types.GenesisDoc, -) (sm.State, error) { +func loadStateFromDBOrGenesisDocProvider(stateStore sm.Store, genDoc *types.GenesisDoc) (sm.State, error) { // 1. Attempt to load state form the database state, err := stateStore.Load() @@ -747,6 +741,12 @@ func loadStateFromDBOrGenesisDocProvider( if err != nil { return sm.State{}, err } + + // 3. save the gensis document to the state store so + // its fetchable by other callers. + if err := stateStore.Save(state); err != nil { + return sm.State{}, err + } } return state, nil diff --git a/node/node_test.go b/node/node_test.go index 41cb1b6a9..5fbf80e00 100644 --- a/node/node_test.go +++ b/node/node_test.go @@ -292,7 +292,6 @@ func TestCreateProposalBlock(t *testing.T) { logger.With("module", "mempool"), cfg.Mempool, proxyApp.Mempool(), - state.LastBlockHeight, ) // Make EvidencePool @@ -392,7 +391,6 @@ func TestMaxTxsProposalBlockSize(t *testing.T) { logger.With("module", "mempool"), cfg.Mempool, proxyApp.Mempool(), - state.LastBlockHeight, ) // fill the mempool with one txs just below the maximum size @@ -457,7 +455,6 @@ func TestMaxProposalBlockSize(t *testing.T) { logger.With("module", "mempool"), cfg.Mempool, proxyApp.Mempool(), - state.LastBlockHeight, ) // fill the mempool with one txs just below the maximum size diff --git a/node/setup.go b/node/setup.go index e880cd5c4..bc071f3cf 100644 --- a/node/setup.go +++ b/node/setup.go @@ -172,7 +172,7 @@ func createMempoolReactor( ctx context.Context, cfg *config.Config, proxyApp proxy.AppConns, - state sm.State, + store sm.Store, memplMetrics *mempool.Metrics, peerManager *p2p.PeerManager, router *p2p.Router, @@ -184,10 +184,9 @@ func createMempoolReactor( logger, cfg.Mempool, proxyApp.Mempool(), - state.LastBlockHeight, mempool.WithMetrics(memplMetrics), - mempool.WithPreCheck(sm.TxPreCheck(state)), - mempool.WithPostCheck(sm.TxPostCheck(state)), + mempool.WithPreCheck(sm.TxPreCheckFromStore(store)), + mempool.WithPostCheck(sm.TxPostCheckFromStore(store)), ) reactor, err := mempool.NewReactor( @@ -214,7 +213,7 @@ func createEvidenceReactor( ctx context.Context, cfg *config.Config, dbProvider config.DBProvider, - stateDB dbm.DB, + store sm.Store, blockStore *store.BlockStore, peerManager *p2p.PeerManager, router *p2p.Router, @@ -229,7 +228,7 @@ func createEvidenceReactor( logger = logger.With("module", "evidence") - evidencePool, err := evidence.NewPool(logger, evidenceDB, sm.NewStore(stateDB), blockStore, metrics) + evidencePool, err := evidence.NewPool(logger, evidenceDB, store, blockStore, metrics) if err != nil { return nil, nil, fmt.Errorf("creating evidence pool: %w", err) } @@ -253,7 +252,7 @@ func createEvidenceReactor( func createConsensusReactor( ctx context.Context, cfg *config.Config, - state sm.State, + store sm.Store, blockExec *sm.BlockExecutor, blockStore sm.BlockStore, mp mempool.Mempool, @@ -268,16 +267,19 @@ func createConsensusReactor( ) (*consensus.Reactor, *consensus.State, error) { logger = logger.With("module", "consensus") - consensusState := consensus.NewState(ctx, + consensusState, err := consensus.NewState(ctx, logger, cfg.Consensus, - state.Copy(), + store, blockExec, blockStore, mp, evidencePool, consensus.StateMetrics(csMetrics), ) + if err != nil { + return nil, nil, err + } if privValidator != nil && cfg.Mode == config.ModeValidator { consensusState.SetPrivValidator(ctx, privValidator) diff --git a/test/fuzz/mempool/checktx.go b/test/fuzz/mempool/checktx.go index ba60d72cc..a6e7006d0 100644 --- a/test/fuzz/mempool/checktx.go +++ b/test/fuzz/mempool/checktx.go @@ -31,7 +31,6 @@ func init() { log.NewNopLogger(), cfg, appConnMem, - 0, ) }