Browse Source

Merge branch 'master' into wb/abci++-protos-branch

pull/8065/head
William Banfield 3 years ago
committed by GitHub
parent
commit
3a77ca990f
No known key found for this signature in database GPG Key ID: 4AEE18F83AFDEB23
24 changed files with 341 additions and 163 deletions
  1. +94
    -0
      docs/tendermint-core/consensus/proposer-based-timestamps.md
  2. +25
    -19
      internal/blocksync/reactor.go
  3. +1
    -1
      internal/blocksync/reactor_test.go
  4. +21
    -11
      internal/consensus/byzantine_test.go
  5. +6
    -3
      internal/consensus/common_test.go
  6. +4
    -3
      internal/consensus/reactor_test.go
  7. +17
    -12
      internal/consensus/replay_file.go
  8. +36
    -9
      internal/consensus/state.go
  9. +5
    -1
      internal/consensus/wal_generator.go
  10. +0
    -7
      internal/eventbus/event_bus.go
  11. +1
    -2
      internal/mempool/mempool.go
  12. +1
    -1
      internal/mempool/mempool_test.go
  13. +0
    -20
      internal/pubsub/pubsub.go
  14. +7
    -8
      internal/rpc/core/env.go
  15. +2
    -2
      internal/rpc/core/net.go
  16. +1
    -1
      internal/rpc/core/status.go
  17. +2
    -2
      internal/state/execution.go
  18. +73
    -10
      internal/state/tx_filter.go
  19. +1
    -1
      internal/state/tx_filter_test.go
  20. +20
    -37
      node/node.go
  21. +0
    -3
      node/node_test.go
  22. +11
    -9
      node/setup.go
  23. +13
    -0
      spec/abci/apps.md
  24. +0
    -1
      test/fuzz/mempool/checktx.go

+ 94
- 0
docs/tendermint-core/consensus/proposer-based-timestamps.md View File

@ -0,0 +1,94 @@
--- order: 3 ---
# PBTS
This document provides an overview of the Proposer-Based Timestamp (PBTS)
algorithm added to Tendermint in the v0.36 release. It outlines the core
functionality as well as the parameters and constraints of the this algorithm.
## Algorithm Overview
The PBTS algorithm defines a way for a Tendermint blockchain to create block
timestamps that are within a reasonable bound of the clocks of the validators on
the network. This replaces the original BFTTime algorithm for timestamp
assignment that relied on the timestamps included in precommit messages.
## Algorithm Parameters
The functionality of the PBTS algorithm is governed by two parameters within
Tendermint. These two parameters are [consensus
parameters](https://github.com/tendermint/tendermint/blob/master/spec/abci/apps.md#L291),
meaning they are configured by the ABCI application and are expected to be the
same across all nodes on the network.
### `Precision`
The `Precision` parameter configures the acceptable upper-bound of clock drift
among all of the nodes on a Tendermint network. Any two nodes on a Tendermint
network are expected to have clocks that differ by at most `Precision`
milliseconds any given instant.
### `MessageDelay`
The `MessageDelay` parameter configures the acceptable upper-bound for
transmitting a `Proposal` message from the proposer to _all_ of the validators
on the network.
Networks should choose as small a value for `MessageDelay` as is practical,
provided it is large enough that messages can reach all participants with high
probability given the number of participants and latency of their connections.
## Algorithm Concepts
### Block timestamps
Each block produced by the Tendermint consensus engine contains a timestamp.
The timestamp produced in each block is a meaningful representation of time that is
useful for the protocols and applications built on top of Tendermint.
The following protocols and application features require a reliable source of time:
* Tendermint Light Clients [rely on correspondence between their known time](https://github.com/tendermint/tendermint/blob/master/spec/light-client/verification/README.md#definitions-1) and the block time for block verification.
* Tendermint Evidence validity is determined [either in terms of heights or in terms of time](https://github.com/tendermint/tendermint/blob/master/spec/consensus/evidence.md#verification).
* Unbonding of staked assets in the Cosmos Hub [occurs after a period of 21
days](https://github.com/cosmos/governance/blob/master/params-change/Staking.md#unbondingtime).
* IBC packets can use either a [timestamp or a height to timeout packet
delivery](https://docs.cosmos.network/v0.44/ibc/overview.html#acknowledgements)
### Proposer Selects a Block Timestamp
When the proposer node creates a new block proposal, the node reads the time
from its local clock and uses this reading as the timestamp for the proposed
block.
### Timeliness
When each validator on a Tendermint network receives a proposed block, it
performs a series of checks to ensure that the block can be considered valid as
a candidate to be the next block in the chain.
The PBTS algorithm performs a validity check on the timestamp of proposed
blocks. When a validator receives a proposal it ensures that the timestamp in
the proposal is within a bound of the validator's local clock. Specifically, the
algorithm checks that the timestamp is no more than `Precision` greater than the
node's local clock and no less than `Precision` + `MessageDelay` behind than the
node's local clock. This creates range of acceptable timestamps around the
node's local time. If the timestamp is within this range, the PBTS algorithm
considers the block **timely**. If a block is not **timely**, the node will
issue a `nil` `prevote` for this block, signaling to the rest of the network
that the node does not consider the block to be valid.
### Clock Synchronization
The PBTS algorithm requires the clocks of the validators on a Tendermint network
are within `Precision` of each other. In practice, this means that validators
should periodically synchronize to a reliable NTP server. Validators that drift
too far away from the rest of the network will no longer propose blocks with
valid timestamps. Additionally they will not view the timestamps of blocks
proposed by their peers to be valid either.
## See Also
* [The PBTS specification](https://github.com/tendermint/tendermint/blob/master/spec/consensus/proposer-based-timestamp/README.md)
contains all of the details of the algorithm.

+ 25
- 19
internal/blocksync/reactor.go View File

@ -70,6 +70,8 @@ type Reactor struct {
// immutable
initialState sm.State
// store
stateStore sm.Store
blockExec *sm.BlockExecutor
store *store.BlockStore
@ -101,7 +103,7 @@ type Reactor struct {
func NewReactor(
ctx context.Context,
logger log.Logger,
state sm.State,
stateStore sm.Store,
blockExec *sm.BlockExecutor,
store *store.BlockStore,
consReactor consensusReactor,
@ -111,19 +113,6 @@ func NewReactor(
metrics *consensus.Metrics,
eventBus *eventbus.EventBus,
) (*Reactor, error) {
if state.LastBlockHeight != store.Height() {
return nil, fmt.Errorf("state (%v) and store (%v) height mismatch", state.LastBlockHeight, store.Height())
}
startHeight := store.Height() + 1
if startHeight == 1 {
startHeight = state.InitialHeight
}
requestsCh := make(chan BlockRequest, maxTotalRequesters)
errorsCh := make(chan peerError, maxPeerErrBuffer) // NOTE: The capacity should be larger than the peer count.
blockSyncCh, err := channelCreator(ctx, GetChannelDescriptor())
if err != nil {
return nil, err
@ -131,20 +120,16 @@ func NewReactor(
r := &Reactor{
logger: logger,
initialState: state,
stateStore: stateStore,
blockExec: blockExec,
store: store,
pool: NewBlockPool(logger, startHeight, requestsCh, errorsCh),
consReactor: consReactor,
blockSync: newAtomicBool(blockSync),
requestsCh: requestsCh,
errorsCh: errorsCh,
blockSyncCh: blockSyncCh,
blockSyncOutBridgeCh: make(chan p2p.Envelope),
peerUpdates: peerUpdates,
metrics: metrics,
eventBus: eventBus,
syncStartTime: time.Time{},
}
r.BaseService = *service.NewBaseService(logger, "BlockSync", r)
@ -159,6 +144,27 @@ func NewReactor(
// If blockSync is enabled, we also start the pool and the pool processing
// goroutine. If the pool fails to start, an error is returned.
func (r *Reactor) OnStart(ctx context.Context) error {
state, err := r.stateStore.Load()
if err != nil {
return err
}
r.initialState = state
if state.LastBlockHeight != r.store.Height() {
return fmt.Errorf("state (%v) and store (%v) height mismatch", state.LastBlockHeight, r.store.Height())
}
startHeight := r.store.Height() + 1
if startHeight == 1 {
startHeight = state.InitialHeight
}
requestsCh := make(chan BlockRequest, maxTotalRequesters)
errorsCh := make(chan peerError, maxPeerErrBuffer) // NOTE: The capacity should be larger than the peer count.
r.pool = NewBlockPool(r.logger, startHeight, requestsCh, errorsCh)
r.requestsCh = requestsCh
r.errorsCh = errorsCh
if r.blockSync.IsSet() {
if err := r.pool.Start(ctx); err != nil {
return err


+ 1
- 1
internal/blocksync/reactor_test.go View File

@ -176,7 +176,7 @@ func (rts *reactorTestSuite) addNode(
rts.reactors[nodeID], err = NewReactor(
ctx,
rts.logger.With("nodeID", nodeID),
state.Copy(),
stateStore,
blockExec,
blockStore,
nil,


+ 21
- 11
internal/consensus/byzantine_test.go View File

@ -82,7 +82,6 @@ func TestByzantinePrevoteEquivocation(t *testing.T) {
log.TestingLogger().With("module", "mempool"),
thisConfig.Mempool,
proxyAppConnMem,
0,
)
if thisConfig.Consensus.WaitForTxs() {
mempool.EnableTxsAvailable()
@ -95,7 +94,8 @@ func TestByzantinePrevoteEquivocation(t *testing.T) {
// Make State
blockExec := sm.NewBlockExecutor(stateStore, log.TestingLogger(), proxyAppConnCon, mempool, evpool, blockStore)
cs := NewState(ctx, logger, thisConfig.Consensus, state, blockExec, blockStore, mempool, evpool)
cs, err := NewState(ctx, logger, thisConfig.Consensus, stateStore, blockExec, blockStore, mempool, evpool)
require.NoError(t, err)
// set private validator
pv := privVals[i]
cs.SetPrivValidator(ctx, pv)
@ -105,14 +105,13 @@ func TestByzantinePrevoteEquivocation(t *testing.T) {
require.NoError(t, err)
cs.SetEventBus(eventBus)
evpool.SetEventBus(eventBus)
cs.SetTimeoutTicker(tickerFunc())
states[i] = cs
}()
}
rts := setup(ctx, t, nValidators, states, 100) // buffer must be large enough to not deadlock
rts := setup(ctx, t, nValidators, states, 512) // buffer must be large enough to not deadlock
var bzNodeID types.NodeID
@ -238,8 +237,7 @@ func TestByzantinePrevoteEquivocation(t *testing.T) {
}
for _, reactor := range rts.reactors {
state := reactor.state.GetState()
reactor.SwitchToConsensus(ctx, state, false)
reactor.SwitchToConsensus(ctx, reactor.state.GetState(), false)
}
// Evidence should be submitted and committed at the third height but
@ -248,20 +246,26 @@ func TestByzantinePrevoteEquivocation(t *testing.T) {
var wg sync.WaitGroup
i := 0
subctx, subcancel := context.WithCancel(ctx)
defer subcancel()
for _, sub := range rts.subs {
wg.Add(1)
go func(j int, s eventbus.Subscription) {
defer wg.Done()
for {
if ctx.Err() != nil {
if subctx.Err() != nil {
return
}
msg, err := s.Next(subctx)
if subctx.Err() != nil {
return
}
msg, err := s.Next(ctx)
assert.NoError(t, err)
if err != nil {
cancel()
t.Errorf("waiting for subscription: %v", err)
subcancel()
return
}
@ -273,12 +277,18 @@ func TestByzantinePrevoteEquivocation(t *testing.T) {
}
}
}(i, sub)
i++
}
wg.Wait()
// don't run more assertions if we've encountered a timeout
select {
case <-subctx.Done():
t.Fatal("encountered timeout")
default:
}
pubkey, err := bzNodeState.privValidator.GetPubKey(ctx)
require.NoError(t, err)


+ 6
- 3
internal/consensus/common_test.go View File

@ -469,7 +469,6 @@ func newStateWithConfigAndBlockStore(
logger.With("module", "mempool"),
thisConfig.Mempool,
proxyAppConnMem,
0,
)
if thisConfig.Consensus.WaitForTxs() {
@ -484,15 +483,19 @@ func newStateWithConfigAndBlockStore(
require.NoError(t, stateStore.Save(state))
blockExec := sm.NewBlockExecutor(stateStore, logger, proxyAppConnCon, mempool, evpool, blockStore)
cs := NewState(ctx,
cs, err := NewState(ctx,
logger.With("module", "consensus"),
thisConfig.Consensus,
state,
stateStore,
blockExec,
blockStore,
mempool,
evpool,
)
if err != nil {
t.Fatal(err)
}
cs.SetPrivValidator(ctx, pv)
eventBus := eventbus.NewDefault(logger.With("module", "events"))


+ 4
- 3
internal/consensus/reactor_test.go View File

@ -461,6 +461,7 @@ func TestReactorWithEvidence(t *testing.T) {
stateStore := sm.NewStore(stateDB)
state, err := sm.MakeGenesisState(genDoc)
require.NoError(t, err)
require.NoError(t, stateStore.Save(state))
thisConfig, err := ResetConfig(t.TempDir(), fmt.Sprintf("%s_%d", testName, i))
require.NoError(t, err)
@ -483,7 +484,6 @@ func TestReactorWithEvidence(t *testing.T) {
log.TestingLogger().With("module", "mempool"),
thisConfig.Mempool,
proxyAppConnMem,
0,
)
if thisConfig.Consensus.WaitForTxs() {
@ -506,8 +506,9 @@ func TestReactorWithEvidence(t *testing.T) {
blockExec := sm.NewBlockExecutor(stateStore, log.TestingLogger(), proxyAppConnCon, mempool, evpool, blockStore)
cs := NewState(ctx, logger.With("validator", i, "module", "consensus"),
thisConfig.Consensus, state, blockExec, blockStore, mempool, evpool2)
cs, err := NewState(ctx, logger.With("validator", i, "module", "consensus"),
thisConfig.Consensus, stateStore, blockExec, blockStore, mempool, evpool2)
require.NoError(t, err)
cs.SetPrivValidator(ctx, pv)
eventBus := eventbus.NewDefault(log.TestingLogger().With("module", "events"))


+ 17
- 12
internal/consensus/replay_file.go View File

@ -84,7 +84,7 @@ func (cs *State) ReplayFile(ctx context.Context, file string, console bool) erro
return err
}
pb := newPlayback(file, fp, cs, cs.state.Copy())
pb := newPlayback(file, fp, cs, cs.stateStore)
defer pb.fp.Close()
var nextN int // apply N msgs in a row
@ -126,17 +126,17 @@ type playback struct {
count int // how many lines/msgs into the file are we
// replays can be reset to beginning
fileName string // so we can close/reopen the file
genesisState sm.State // so the replay session knows where to restart from
fileName string // so we can close/reopen the file
stateStore sm.Store
}
func newPlayback(fileName string, fp *os.File, cs *State, genState sm.State) *playback {
func newPlayback(fileName string, fp *os.File, cs *State, store sm.Store) *playback {
return &playback{
cs: cs,
fp: fp,
fileName: fileName,
genesisState: genState,
dec: NewWALDecoder(fp),
cs: cs,
fp: fp,
fileName: fileName,
stateStore: store,
dec: NewWALDecoder(fp),
}
}
@ -145,8 +145,11 @@ func (pb *playback) replayReset(ctx context.Context, count int, newStepSub event
pb.cs.Stop()
pb.cs.Wait()
newCS := NewState(ctx, pb.cs.logger, pb.cs.config, pb.genesisState.Copy(), pb.cs.blockExec,
newCS, err := NewState(ctx, pb.cs.logger, pb.cs.config, pb.stateStore, pb.cs.blockExec,
pb.cs.blockStore, pb.cs.txNotifier, pb.cs.evpool)
if err != nil {
return err
}
newCS.SetEventBus(pb.cs.eventBus)
newCS.startForReplay()
@ -345,9 +348,11 @@ func newConsensusStateForReplay(
mempool, evpool := emptyMempool{}, sm.EmptyEvidencePool{}
blockExec := sm.NewBlockExecutor(stateStore, logger, proxyApp.Consensus(), mempool, evpool, blockStore)
consensusState := NewState(ctx, logger, csConfig, state.Copy(), blockExec,
consensusState, err := NewState(ctx, logger, csConfig, stateStore, blockExec,
blockStore, mempool, evpool)
if err != nil {
return nil, err
}
consensusState.SetEventBus(eventBus)
return consensusState, nil
}

+ 36
- 9
internal/consensus/state.go View File

@ -121,6 +121,9 @@ type State struct {
// store blocks and commits
blockStore sm.BlockStore
stateStore sm.Store
initialStatePopulated bool
// create and execute blocks
blockExec *sm.BlockExecutor
@ -189,18 +192,19 @@ func NewState(
ctx context.Context,
logger log.Logger,
cfg *config.ConsensusConfig,
state sm.State,
store sm.Store,
blockExec *sm.BlockExecutor,
blockStore sm.BlockStore,
txNotifier txNotifier,
evpool evidencePool,
options ...StateOption,
) *State {
) (*State, error) {
cs := &State{
logger: logger,
config: cfg,
blockExec: blockExec,
blockStore: blockStore,
stateStore: store,
txNotifier: txNotifier,
peerMsgQueue: make(chan msgInfo, msgQueueSize),
internalMsgQueue: make(chan msgInfo, msgQueueSize),
@ -220,21 +224,40 @@ func NewState(
cs.doPrevote = cs.defaultDoPrevote
cs.setProposal = cs.defaultSetProposal
// We have no votes, so reconstruct LastCommit from SeenCommit.
if state.LastBlockHeight > 0 {
cs.reconstructLastCommit(state)
if err := cs.updateStateFromStore(ctx); err != nil {
return nil, err
}
cs.updateToState(ctx, state)
// NOTE: we do not call scheduleRound0 yet, we do that upon Start()
cs.BaseService = *service.NewBaseService(logger, "State", cs)
for _, option := range options {
option(cs)
}
return cs
return cs, nil
}
func (cs *State) updateStateFromStore(ctx context.Context) error {
if cs.initialStatePopulated {
return nil
}
state, err := cs.stateStore.Load()
if err != nil {
return fmt.Errorf("loading state: %w", err)
}
if state.IsEmpty() {
return nil
}
// We have no votes, so reconstruct LastCommit from SeenCommit.
if state.LastBlockHeight > 0 {
cs.reconstructLastCommit(state)
}
cs.updateToState(ctx, state)
cs.initialStatePopulated = true
return nil
}
// SetEventBus sets event bus.
@ -365,6 +388,10 @@ func (cs *State) LoadCommit(height int64) *types.Commit {
// OnStart loads the latest state via the WAL, and starts the timeout and
// receive routines.
func (cs *State) OnStart(ctx context.Context) error {
if err := cs.updateStateFromStore(ctx); err != nil {
return err
}
// We may set the WAL in testing before calling Start, so only OpenWAL if its
// still the nilWAL.
if _, ok := cs.wal.(nilWAL); ok {


+ 5
- 1
internal/consensus/wal_generator.go View File

@ -83,7 +83,11 @@ func WALGenerateNBlocks(ctx context.Context, t *testing.T, logger log.Logger, wr
mempool := emptyMempool{}
evpool := sm.EmptyEvidencePool{}
blockExec := sm.NewBlockExecutor(stateStore, log.TestingLogger(), proxyApp.Consensus(), mempool, evpool, blockStore)
consensusState := NewState(ctx, logger, cfg.Consensus, state.Copy(), blockExec, blockStore, mempool, evpool)
consensusState, err := NewState(ctx, logger, cfg.Consensus, stateStore, blockExec, blockStore, mempool, evpool)
if err != nil {
t.Fatal(err)
}
consensusState.SetEventBus(eventBus)
if privValidator != nil && privValidator != (*privval.FilePV)(nil) {
consensusState.SetPrivValidator(ctx, privValidator)


+ 0
- 7
internal/eventbus/event_bus.go View File

@ -50,13 +50,6 @@ func (b *EventBus) NumClientSubscriptions(clientID string) int {
return b.pubsub.NumClientSubscriptions(clientID)
}
// Deprecated: Use SubscribeWithArgs instead.
func (b *EventBus) Subscribe(ctx context.Context,
clientID string, query *tmquery.Query, capacities ...int) (Subscription, error) {
return b.pubsub.Subscribe(ctx, clientID, query, capacities...)
}
func (b *EventBus) SubscribeWithArgs(ctx context.Context, args tmpubsub.SubscribeArgs) (Subscription, error) {
return b.pubsub.SubscribeWithArgs(ctx, args)
}


+ 1
- 2
internal/mempool/mempool.go View File

@ -94,7 +94,6 @@ func NewTxMempool(
logger log.Logger,
cfg *config.MempoolConfig,
proxyAppConn proxy.AppConnMempool,
height int64,
options ...TxMempoolOption,
) *TxMempool {
@ -102,7 +101,7 @@ func NewTxMempool(
logger: logger,
config: cfg,
proxyAppConn: proxyAppConn,
height: height,
height: -1,
cache: NopTxCache{},
metrics: NopMetrics(),
txStore: NewTxStore(),


+ 1
- 1
internal/mempool/mempool_test.go View File

@ -95,7 +95,7 @@ func setup(ctx context.Context, t testing.TB, cacheSize int, options ...TxMempoo
appConnMem.Wait()
})
return NewTxMempool(logger.With("test", t.Name()), cfg.Mempool, appConnMem, 0, options...)
return NewTxMempool(logger.With("test", t.Name()), cfg.Mempool, appConnMem, options...)
}
func checkTxs(ctx context.Context, t *testing.T, txmp *TxMempool, numTxs int, peerID uint16) []testTx {


+ 0
- 20
internal/pubsub/pubsub.go View File

@ -153,26 +153,6 @@ func BufferCapacity(cap int) Option {
// BufferCapacity returns capacity of the publication queue.
func (s *Server) BufferCapacity() int { return cap(s.queue) }
// Subscribe creates a subscription for the given client ID and query.
// If len(capacities) > 0, its first value is used as the queue capacity.
//
// Deprecated: Use SubscribeWithArgs. This method will be removed in v0.36.
func (s *Server) Subscribe(ctx context.Context, clientID string, query *query.Query, capacities ...int) (*Subscription, error) {
args := SubscribeArgs{
ClientID: clientID,
Query: query,
Limit: 1,
}
if len(capacities) > 0 {
args.Limit = capacities[0]
if len(capacities) > 1 {
args.Quota = capacities[1]
}
// bounds are checked below
}
return s.SubscribeWithArgs(ctx, args)
}
// Observe registers an observer function that will be called synchronously
// with each published message matching any of the given queries, prior to it
// being forwarded to any subscriber. If no queries are specified, all


+ 7
- 8
internal/rpc/core/env.go View File

@ -57,12 +57,6 @@ type consensusState interface {
GetRoundStateSimpleJSON() ([]byte, error)
}
type transport interface {
Listeners() []string
IsListening() bool
NodeInfo() types.NodeInfo
}
type peerManager interface {
Peers() []types.NodeID
Addresses(types.NodeID) []p2p.NodeAddress
@ -84,8 +78,9 @@ type Environment struct {
ConsensusReactor *consensus.Reactor
BlockSyncReactor *blocksync.Reactor
// Legacy p2p stack
P2PTransport transport
IsListening bool
Listeners []string
NodeInfo types.NodeInfo
// interfaces for new p2p interfaces
PeerManager peerManager
@ -226,6 +221,10 @@ func (env *Environment) StartService(ctx context.Context, conf *config.Config) (
return nil, err
}
env.Listeners = []string{
fmt.Sprintf("Listener(@%v)", conf.P2P.ExternalAddress),
}
listenAddrs := strings.SplitAndTrimEmpty(conf.RPC.ListenAddress, ",", " ")
routes := NewRoutesMap(env, &RouteOptions{
Unsafe: conf.RPC.Unsafe,


+ 2
- 2
internal/rpc/core/net.go View File

@ -27,8 +27,8 @@ func (env *Environment) NetInfo(ctx context.Context) (*coretypes.ResultNetInfo,
}
return &coretypes.ResultNetInfo{
Listening: env.P2PTransport.IsListening(),
Listeners: env.P2PTransport.Listeners(),
Listening: env.IsListening,
Listeners: env.Listeners,
NPeers: len(peers),
Peers: peers,
}, nil


+ 1
- 1
internal/rpc/core/status.go View File

@ -66,7 +66,7 @@ func (env *Environment) Status(ctx context.Context) (*coretypes.ResultStatus, er
}
result := &coretypes.ResultStatus{
NodeInfo: env.P2PTransport.NodeInfo(),
NodeInfo: env.NodeInfo,
ApplicationInfo: applicationInfo,
SyncInfo: coretypes.SyncInfo{
LatestBlockHash: latestBlockHash,


+ 2
- 2
internal/state/execution.go View File

@ -360,8 +360,8 @@ func (blockExec *BlockExecutor) Commit(
block.Height,
block.Txs,
deliverTxResponses,
TxPreCheck(state),
TxPostCheck(state),
TxPreCheckForState(state),
TxPostCheckForState(state),
)
return res.Data, res.RetainHeight, err


+ 73
- 10
internal/state/tx_filter.go View File

@ -1,22 +1,85 @@
package state
import (
"sync"
"time"
abci "github.com/tendermint/tendermint/abci/types"
"github.com/tendermint/tendermint/internal/mempool"
"github.com/tendermint/tendermint/types"
)
// TxPreCheck returns a function to filter transactions before processing.
// The function limits the size of a transaction to the block's maximum data size.
func TxPreCheck(state State) mempool.PreCheckFunc {
maxDataBytes := types.MaxDataBytesNoEvidence(
state.ConsensusParams.Block.MaxBytes,
state.Validators.Size(),
func cachingStateFetcher(store Store) func() (State, error) {
const ttl = time.Second
var (
last time.Time
mutex = &sync.Mutex{}
cache State
err error
)
return mempool.PreCheckMaxBytes(maxDataBytes)
return func() (State, error) {
mutex.Lock()
defer mutex.Unlock()
if time.Since(last) < ttl && cache.ChainID != "" {
return cache, nil
}
cache, err = store.Load()
if err != nil {
return State{}, err
}
last = time.Now()
return cache, nil
}
}
// TxPostCheck returns a function to filter transactions after processing.
// TxPreCheckFromStore returns a function to filter transactions before processing.
// The function limits the size of a transaction to the block's maximum data size.
func TxPreCheckFromStore(store Store) mempool.PreCheckFunc {
fetch := cachingStateFetcher(store)
return func(tx types.Tx) error {
state, err := fetch()
if err != nil {
return err
}
return TxPreCheckForState(state)(tx)
}
}
func TxPreCheckForState(state State) mempool.PreCheckFunc {
return func(tx types.Tx) error {
maxDataBytes := types.MaxDataBytesNoEvidence(
state.ConsensusParams.Block.MaxBytes,
state.Validators.Size(),
)
return mempool.PreCheckMaxBytes(maxDataBytes)(tx)
}
}
// TxPostCheckFromStore returns a function to filter transactions after processing.
// The function limits the gas wanted by a transaction to the block's maximum total gas.
func TxPostCheck(state State) mempool.PostCheckFunc {
return mempool.PostCheckMaxGas(state.ConsensusParams.Block.MaxGas)
func TxPostCheckFromStore(store Store) mempool.PostCheckFunc {
fetch := cachingStateFetcher(store)
return func(tx types.Tx, resp *abci.ResponseCheckTx) error {
state, err := fetch()
if err != nil {
return err
}
return mempool.PostCheckMaxGas(state.ConsensusParams.Block.MaxGas)(tx, resp)
}
}
func TxPostCheckForState(state State) mempool.PostCheckFunc {
return func(tx types.Tx, resp *abci.ResponseCheckTx) error {
return mempool.PostCheckMaxGas(state.ConsensusParams.Block.MaxGas)(tx, resp)
}
}

+ 1
- 1
internal/state/tx_filter_test.go View File

@ -31,7 +31,7 @@ func TestTxFilter(t *testing.T) {
state, err := sm.MakeGenesisState(genDoc)
require.NoError(t, err)
f := sm.TxPreCheck(state)
f := sm.TxPreCheckForState(state)
if tc.isErr {
assert.NotNil(t, f(tc.tx), "#%v", i)
} else {


+ 20
- 37
node/node.go View File

@ -58,7 +58,6 @@ type nodeImpl struct {
router *p2p.Router
nodeInfo types.NodeInfo
nodeKey types.NodeKey // our node privkey
isListening bool
// services
eventSinks []indexer.EventSink
@ -144,11 +143,8 @@ func makeNode(
return nil, combineCloseError(err, makeCloser(closers))
}
err = genDoc.ValidateAndComplete()
if err != nil {
return nil, combineCloseError(
fmt.Errorf("error in genesis doc: %w", err),
makeCloser(closers))
if err = genDoc.ValidateAndComplete(); err != nil {
return nil, combineCloseError(fmt.Errorf("error in genesis doc: %w", err), makeCloser(closers))
}
state, err := loadStateFromDBOrGenesisDocProvider(stateStore, genDoc)
@ -242,10 +238,6 @@ func makeNode(
}
}
// Determine whether we should do block sync. This must happen after the handshake, since the
// app may modify the validator set, specifying ourself as the only validator.
blockSync := !onlyValidatorIsUs(state, pubKey)
logNodeStartupInfo(state, pubKey, logger, cfg.Mode)
// TODO: Fetch and provide real options and do proper p2p bootstrapping.
@ -272,14 +264,14 @@ func makeNode(
}
mpReactor, mp, err := createMempoolReactor(ctx,
cfg, proxyApp, state, nodeMetrics.mempool, peerManager, router, logger,
cfg, proxyApp, stateStore, nodeMetrics.mempool, peerManager, router, logger,
)
if err != nil {
return nil, combineCloseError(err, makeCloser(closers))
}
evReactor, evPool, err := createEvidenceReactor(ctx,
cfg, dbProvider, stateDB, blockStore, peerManager, router, logger, nodeMetrics.evidence, eventBus,
cfg, dbProvider, stateStore, blockStore, peerManager, router, logger, nodeMetrics.evidence, eventBus,
)
if err != nil {
return nil, combineCloseError(err, makeCloser(closers))
@ -296,8 +288,12 @@ func makeNode(
sm.BlockExecutorWithMetrics(nodeMetrics.state),
)
// Determine whether we should do block sync. This must happen after the handshake, since the
// app may modify the validator set, specifying ourself as the only validator.
blockSync := !onlyValidatorIsUs(state, pubKey)
csReactor, csState, err := createConsensusReactor(ctx,
cfg, state, blockExec, blockStore, mp, evPool,
cfg, stateStore, blockExec, blockStore, mp, evPool,
privValidator, nodeMetrics.consensus, stateSync || blockSync, eventBus,
peerManager, router, logger,
)
@ -309,7 +305,7 @@ func makeNode(
// doing a state sync first.
bcReactor, err := blocksync.NewReactor(ctx,
logger.With("module", "blockchain"),
state.Copy(),
stateStore,
blockExec,
blockStore,
csReactor,
@ -421,8 +417,6 @@ func makeNode(
node.rpcEnv.PubKey = pubKey
}
node.rpcEnv.P2PTransport = node
node.BaseService = *service.NewBaseService(logger, "Node", node)
return node, nil
@ -467,6 +461,7 @@ func (n *nodeImpl) OnStart(ctx context.Context) error {
}
}
n.rpcEnv.NodeInfo = n.nodeInfo
// Start the RPC server before the P2P server
// so we can eg. receive txs for the first block
if n.config.RPC.ListenAddress != "" {
@ -485,7 +480,7 @@ func (n *nodeImpl) OnStart(ctx context.Context) error {
if err := n.router.Start(ctx); err != nil {
return err
}
n.isListening = true
n.rpcEnv.IsListening = true
for _, reactor := range n.services {
if err := reactor.Start(ctx); err != nil {
@ -580,7 +575,7 @@ func (n *nodeImpl) OnStop() {
n.stateSyncReactor.Wait()
n.router.Wait()
n.isListening = false
n.rpcEnv.IsListening = false
// finally stop the listeners / external services
for _, l := range n.rpcListeners {
@ -669,21 +664,6 @@ func (n *nodeImpl) RPCEnvironment() *rpccore.Environment {
//------------------------------------------------------------------------------
func (n *nodeImpl) Listeners() []string {
return []string{
fmt.Sprintf("Listener(@%v)", n.config.P2P.ExternalAddress),
}
}
func (n *nodeImpl) IsListening() bool {
return n.isListening
}
// NodeInfo returns the Node's Info from the Switch.
func (n *nodeImpl) NodeInfo() types.NodeInfo {
return n.nodeInfo
}
// genesisDocProvider returns a GenesisDoc.
// It allows the GenesisDoc to be pulled from sources other than the
// filesystem, for instance from a distributed key-value store cluster.
@ -747,10 +727,7 @@ func defaultMetricsProvider(cfg *config.InstrumentationConfig) metricsProvider {
// loadStateFromDBOrGenesisDocProvider attempts to load the state from the
// database, or creates one using the given genesisDocProvider. On success this also
// returns the genesis doc loaded through the given provider.
func loadStateFromDBOrGenesisDocProvider(
stateStore sm.Store,
genDoc *types.GenesisDoc,
) (sm.State, error) {
func loadStateFromDBOrGenesisDocProvider(stateStore sm.Store, genDoc *types.GenesisDoc) (sm.State, error) {
// 1. Attempt to load state form the database
state, err := stateStore.Load()
@ -764,6 +741,12 @@ func loadStateFromDBOrGenesisDocProvider(
if err != nil {
return sm.State{}, err
}
// 3. save the gensis document to the state store so
// its fetchable by other callers.
if err := stateStore.Save(state); err != nil {
return sm.State{}, err
}
}
return state, nil


+ 0
- 3
node/node_test.go View File

@ -292,7 +292,6 @@ func TestCreateProposalBlock(t *testing.T) {
logger.With("module", "mempool"),
cfg.Mempool,
proxyApp.Mempool(),
state.LastBlockHeight,
)
// Make EvidencePool
@ -392,7 +391,6 @@ func TestMaxTxsProposalBlockSize(t *testing.T) {
logger.With("module", "mempool"),
cfg.Mempool,
proxyApp.Mempool(),
state.LastBlockHeight,
)
// fill the mempool with one txs just below the maximum size
@ -457,7 +455,6 @@ func TestMaxProposalBlockSize(t *testing.T) {
logger.With("module", "mempool"),
cfg.Mempool,
proxyApp.Mempool(),
state.LastBlockHeight,
)
// fill the mempool with one txs just below the maximum size


+ 11
- 9
node/setup.go View File

@ -172,7 +172,7 @@ func createMempoolReactor(
ctx context.Context,
cfg *config.Config,
proxyApp proxy.AppConns,
state sm.State,
store sm.Store,
memplMetrics *mempool.Metrics,
peerManager *p2p.PeerManager,
router *p2p.Router,
@ -184,10 +184,9 @@ func createMempoolReactor(
logger,
cfg.Mempool,
proxyApp.Mempool(),
state.LastBlockHeight,
mempool.WithMetrics(memplMetrics),
mempool.WithPreCheck(sm.TxPreCheck(state)),
mempool.WithPostCheck(sm.TxPostCheck(state)),
mempool.WithPreCheck(sm.TxPreCheckFromStore(store)),
mempool.WithPostCheck(sm.TxPostCheckFromStore(store)),
)
reactor, err := mempool.NewReactor(
@ -214,7 +213,7 @@ func createEvidenceReactor(
ctx context.Context,
cfg *config.Config,
dbProvider config.DBProvider,
stateDB dbm.DB,
store sm.Store,
blockStore *store.BlockStore,
peerManager *p2p.PeerManager,
router *p2p.Router,
@ -229,7 +228,7 @@ func createEvidenceReactor(
logger = logger.With("module", "evidence")
evidencePool, err := evidence.NewPool(logger, evidenceDB, sm.NewStore(stateDB), blockStore, metrics)
evidencePool, err := evidence.NewPool(logger, evidenceDB, store, blockStore, metrics)
if err != nil {
return nil, nil, fmt.Errorf("creating evidence pool: %w", err)
}
@ -253,7 +252,7 @@ func createEvidenceReactor(
func createConsensusReactor(
ctx context.Context,
cfg *config.Config,
state sm.State,
store sm.Store,
blockExec *sm.BlockExecutor,
blockStore sm.BlockStore,
mp mempool.Mempool,
@ -268,16 +267,19 @@ func createConsensusReactor(
) (*consensus.Reactor, *consensus.State, error) {
logger = logger.With("module", "consensus")
consensusState := consensus.NewState(ctx,
consensusState, err := consensus.NewState(ctx,
logger,
cfg.Consensus,
state.Copy(),
store,
blockExec,
blockStore,
mp,
evidencePool,
consensus.StateMetrics(csMetrics),
)
if err != nil {
return nil, nil, err
}
if privValidator != nil && cfg.Mode == config.ModeValidator {
consensusState.SetPrivValidator(ctx, privValidator)


+ 13
- 0
spec/abci/apps.md View File

@ -346,6 +346,19 @@ a block minus it's overhead ( ~ `MaxBytes`).
Must have `MaxNum > 0`.
### SynchronyParams.Precision
`SynchronyParams.Precision` is a parameter of the Proposer-Based Timestamps algorithm.
that configures the acceptable upper-bound of clock drift among
all of the nodes on a Tendermint network. Any two nodes on a Tendermint network
are expected to have clocks that differ by at most `Precision`.
### SynchronyParams.MessageDelay
`SynchronyParams.MessageDelay` is a parameter of the Proposer-Based Timestamps
algorithm that configures the acceptable upper-bound for transmitting a `Proposal`
message from the proposer to all of the validators on the network.
### Updates
The application may set the ConsensusParams during InitChain, and update them during


+ 0
- 1
test/fuzz/mempool/checktx.go View File

@ -31,7 +31,6 @@ func init() {
log.NewNopLogger(),
cfg,
appConnMem,
0,
)
}


Loading…
Cancel
Save