Browse Source

contexts: remove all TODO instances (#7466)

pull/7469/head
Sam Kleinman 2 years ago
committed by GitHub
parent
commit
bef120dadf
No known key found for this signature in database GPG Key ID: 4AEE18F83AFDEB23
27 changed files with 128 additions and 128 deletions
  1. +3
    -3
      cmd/tendermint/commands/init.go
  2. +4
    -3
      cmd/tendermint/commands/show_validator.go
  3. +4
    -4
      cmd/tendermint/commands/testnet.go
  4. +4
    -4
      internal/consensus/byzantine_test.go
  5. +22
    -21
      internal/consensus/common_test.go
  6. +1
    -1
      internal/consensus/reactor_test.go
  7. +2
    -2
      internal/consensus/replay.go
  8. +2
    -3
      internal/consensus/replay_file.go
  9. +15
    -14
      internal/consensus/replay_test.go
  10. +15
    -14
      internal/consensus/state.go
  11. +1
    -1
      internal/consensus/state_test.go
  12. +1
    -1
      internal/consensus/wal_generator.go
  13. +0
    -4
      internal/mempool/mempool.go
  14. +3
    -4
      internal/state/execution.go
  15. +1
    -1
      internal/state/indexer/indexer_service.go
  16. +5
    -2
      internal/state/indexer/sink/null/null_test.go
  17. +4
    -4
      internal/statesync/reactor.go
  18. +12
    -18
      internal/statesync/syncer.go
  19. +13
    -13
      internal/statesync/syncer_test.go
  20. +1
    -0
      node/node.go
  21. +1
    -1
      node/setup.go
  22. +1
    -1
      privval/grpc/util.go
  23. +3
    -3
      privval/signer_server.go
  24. +4
    -1
      types/evidence_test.go
  25. +2
    -2
      types/validator_set_test.go
  26. +2
    -2
      types/validator_test.go
  27. +2
    -1
      types/vote_set_test.go

+ 3
- 3
cmd/tendermint/commands/init.go View File

@ -39,10 +39,10 @@ func initFiles(cmd *cobra.Command, args []string) error {
return errors.New("must specify a node type: tendermint init [validator|full|seed]")
}
config.Mode = args[0]
return initFilesWithConfig(config)
return initFilesWithConfig(cmd.Context(), config)
}
func initFilesWithConfig(config *cfg.Config) error {
func initFilesWithConfig(ctx context.Context, config *cfg.Config) error {
var (
pv *privval.FilePV
err error
@ -98,7 +98,7 @@ func initFilesWithConfig(config *cfg.Config) error {
}
}
ctx, cancel := context.WithTimeout(context.TODO(), ctxTimeout)
ctx, cancel := context.WithTimeout(ctx, ctxTimeout)
defer cancel()
// if this is a validator we add it to genesis


+ 4
- 3
cmd/tendermint/commands/show_validator.go View File

@ -25,13 +25,14 @@ func showValidator(cmd *cobra.Command, args []string) error {
var (
pubKey crypto.PubKey
err error
bctx = cmd.Context()
)
//TODO: remove once gRPC is the only supported protocol
protocol, _ := tmnet.ProtocolAndAddress(config.PrivValidator.ListenAddr)
switch protocol {
case "grpc":
pvsc, err := tmgrpc.DialRemoteSigner(
bctx,
config.PrivValidator,
config.ChainID(),
logger,
@ -41,7 +42,7 @@ func showValidator(cmd *cobra.Command, args []string) error {
return fmt.Errorf("can't connect to remote validator %w", err)
}
ctx, cancel := context.WithTimeout(context.TODO(), ctxTimeout)
ctx, cancel := context.WithTimeout(bctx, ctxTimeout)
defer cancel()
pubKey, err = pvsc.GetPubKey(ctx)
@ -60,7 +61,7 @@ func showValidator(cmd *cobra.Command, args []string) error {
return err
}
ctx, cancel := context.WithTimeout(context.TODO(), ctxTimeout)
ctx, cancel := context.WithTimeout(bctx, ctxTimeout)
defer cancel()
pubKey, err = pv.GetPubKey(ctx)


+ 4
- 4
cmd/tendermint/commands/testnet.go View File

@ -122,7 +122,7 @@ func testnetFiles(cmd *cobra.Command, args []string) error {
}
genVals := make([]types.GenesisValidator, nValidators)
ctx := cmd.Context()
for i := 0; i < nValidators; i++ {
nodeDirName := fmt.Sprintf("%s%d", nodeDirPrefix, i)
nodeDir := filepath.Join(outputDir, nodeDirName)
@ -139,7 +139,7 @@ func testnetFiles(cmd *cobra.Command, args []string) error {
return err
}
if err := initFilesWithConfig(config); err != nil {
if err := initFilesWithConfig(ctx, config); err != nil {
return err
}
@ -150,7 +150,7 @@ func testnetFiles(cmd *cobra.Command, args []string) error {
return err
}
ctx, cancel := context.WithTimeout(context.TODO(), ctxTimeout)
ctx, cancel := context.WithTimeout(ctx, ctxTimeout)
defer cancel()
pubKey, err := pv.GetPubKey(ctx)
@ -181,7 +181,7 @@ func testnetFiles(cmd *cobra.Command, args []string) error {
return err
}
if err := initFilesWithConfig(config); err != nil {
if err := initFilesWithConfig(ctx, config); err != nil {
return err
}
}


+ 4
- 4
internal/consensus/byzantine_test.go View File

@ -92,7 +92,7 @@ func TestByzantinePrevoteEquivocation(t *testing.T) {
cs := NewState(ctx, logger, thisConfig.Consensus, state, blockExec, blockStore, mempool, evpool)
// set private validator
pv := privVals[i]
cs.SetPrivValidator(pv)
cs.SetPrivValidator(ctx, pv)
eventBus := eventbus.NewDefault(log.TestingLogger().With("module", "events"))
err = eventBus.Start(ctx)
@ -125,14 +125,14 @@ func TestByzantinePrevoteEquivocation(t *testing.T) {
bzNodeState.doPrevote = func(ctx context.Context, height int64, round int32) {
// allow first height to happen normally so that byzantine validator is no longer proposer
if height == prevoteHeight {
prevote1, err := bzNodeState.signVote(
prevote1, err := bzNodeState.signVote(ctx,
tmproto.PrevoteType,
bzNodeState.ProposalBlock.Hash(),
bzNodeState.ProposalBlockParts.Header(),
)
require.NoError(t, err)
prevote2, err := bzNodeState.signVote(tmproto.PrevoteType, nil, types.PartSetHeader{})
prevote2, err := bzNodeState.signVote(ctx, tmproto.PrevoteType, nil, types.PartSetHeader{})
require.NoError(t, err)
// send two votes to all peers (1st to one half, 2nd to another half)
@ -172,7 +172,7 @@ func TestByzantinePrevoteEquivocation(t *testing.T) {
// lazyProposer := states[1]
lazyNodeState := states[1]
lazyNodeState.decideProposal = func(height int64, round int32) {
lazyNodeState.decideProposal = func(ctx context.Context, height int64, round int32) {
lazyNodeState.logger.Info("Lazy Proposer proposing condensed commit")
require.NotNil(t, lazyNodeState.privValidator)


+ 22
- 21
internal/consensus/common_test.go View File

@ -8,6 +8,7 @@ import (
"os"
"path"
"path/filepath"
"sort"
"sync"
"testing"
"time"
@ -195,28 +196,28 @@ func (vss ValidatorStubsByPower) Len() int {
return len(vss)
}
func (vss ValidatorStubsByPower) Less(i, j int) bool {
vssi, err := vss[i].GetPubKey(context.TODO())
if err != nil {
panic(err)
}
vssj, err := vss[j].GetPubKey(context.TODO())
if err != nil {
panic(err)
}
func sortVValidatorStubsByPower(ctx context.Context, vss []*validatorStub) []*validatorStub {
sort.Slice(vss, func(i, j int) bool {
vssi, err := vss[i].GetPubKey(ctx)
if err != nil {
panic(err)
}
vssj, err := vss[j].GetPubKey(ctx)
if err != nil {
panic(err)
}
if vss[i].VotingPower == vss[j].VotingPower {
return bytes.Compare(vssi.Address(), vssj.Address()) == -1
if vss[i].VotingPower == vss[j].VotingPower {
return bytes.Compare(vssi.Address(), vssj.Address()) == -1
}
return vss[i].VotingPower > vss[j].VotingPower
})
for idx, vs := range vss {
vs.Index = int32(idx)
}
return vss[i].VotingPower > vss[j].VotingPower
}
func (vss ValidatorStubsByPower) Swap(i, j int) {
it := vss[i]
vss[i] = vss[j]
vss[i].Index = int32(i)
vss[j] = it
vss[j].Index = int32(j)
return vss
}
//-------------------------------------------------------------------------------
@ -475,7 +476,7 @@ func newStateWithConfigAndBlockStore(
mempool,
evpool,
)
cs.SetPrivValidator(pv)
cs.SetPrivValidator(ctx, pv)
eventBus := eventbus.NewDefault(logger.With("module", "events"))
err := eventBus.Start(ctx)
@ -814,7 +815,7 @@ func randConsensusState(
func randConsensusNetWithPeers(
ctx context.Context,
cfg *config.Config,
nValidators,
nValidators int,
nPeers int,
testName string,
tickerFunc func() TimeoutTicker,


+ 1
- 1
internal/consensus/reactor_test.go View File

@ -422,7 +422,7 @@ func TestReactorWithEvidence(t *testing.T) {
blockExec := sm.NewBlockExecutor(stateStore, log.TestingLogger(), proxyAppConnCon, mempool, evpool, blockStore)
cs := NewState(ctx, logger.With("validator", i, "module", "consensus"),
thisConfig.Consensus, state, blockExec, blockStore, mempool, evpool2)
cs.SetPrivValidator(pv)
cs.SetPrivValidator(ctx, pv)
eventBus := eventbus.NewDefault(log.TestingLogger().With("module", "events"))
require.NoError(t, eventBus.Start(ctx))


+ 2
- 2
internal/consensus/replay.go View File

@ -50,9 +50,9 @@ func (cs *State) readReplayMessage(ctx context.Context, msg *TimedWALMessage, ne
cs.logger.Info("Replay: New Step", "height", m.Height, "round", m.Round, "step", m.Step)
// these are playback checks
if newStepSub != nil {
ctx, cancel := context.WithTimeout(ctx, 2*time.Second)
ctxto, cancel := context.WithTimeout(ctx, 2*time.Second)
defer cancel()
stepMsg, err := newStepSub.Next(ctx)
stepMsg, err := newStepSub.Next(ctxto)
if errors.Is(err, context.DeadlineExceeded) {
return fmt.Errorf("subscription timed out: %w", err)
} else if err != nil {


+ 2
- 3
internal/consensus/replay_file.go View File

@ -91,7 +91,7 @@ func (cs *State) ReplayFile(ctx context.Context, file string, console bool) erro
var msg *TimedWALMessage
for {
if nextN == 0 && console {
nextN, err = pb.replayConsoleLoop()
nextN, err = pb.replayConsoleLoop(ctx)
if err != nil {
return err
}
@ -187,7 +187,7 @@ func (cs *State) startForReplay() {
// console function for parsing input and running commands. The integer
// return value is invalid unless the error is nil.
func (pb *playback) replayConsoleLoop() (int, error) {
func (pb *playback) replayConsoleLoop(ctx context.Context) (int, error) {
for {
fmt.Printf("> ")
bufReader := bufio.NewReader(os.Stdin)
@ -225,7 +225,6 @@ func (pb *playback) replayConsoleLoop() (int, error) {
// NOTE: "back" is not supported in the state machine design,
// so we restart and replay up to
ctx := context.TODO()
// ensure all new step events are regenerated as expected
newStepSub, err := pb.cs.eventBus.SubscribeWithArgs(ctx, tmpubsub.SubscribeArgs{


+ 15
- 14
internal/consensus/replay_test.go View File

@ -10,7 +10,6 @@ import (
"os"
"path/filepath"
"runtime"
"sort"
"testing"
"time"
@ -93,9 +92,9 @@ func startNewStateAndWaitForBlock(ctx context.Context, t *testing.T, consensusRe
Query: types.EventQueryNewBlock,
})
require.NoError(t, err)
ctx, cancel := context.WithTimeout(ctx, 120*time.Second)
ctxto, cancel := context.WithTimeout(ctx, 120*time.Second)
defer cancel()
_, err = newBlockSub.Next(ctx)
_, err = newBlockSub.Next(ctxto)
if errors.Is(err, context.DeadlineExceeded) {
t.Fatal("Timed out waiting for new block (see trace above)")
} else if err != nil {
@ -148,7 +147,7 @@ func TestWALCrash(t *testing.T) {
}
}
func crashWALandCheckLiveness(ctx context.Context, t *testing.T, consensusReplayConfig *config.Config,
func crashWALandCheckLiveness(rctx context.Context, t *testing.T, consensusReplayConfig *config.Config,
initFn func(dbm.DB, *State, context.Context), heightToStop int64) {
walPanicked := make(chan error)
crashingWal := &crashingWAL{panicCh: walPanicked, heightToStop: heightToStop}
@ -168,7 +167,7 @@ LOOP:
require.NoError(t, err)
privValidator := loadPrivValidator(consensusReplayConfig)
cs := newStateWithConfigAndBlockStore(
ctx,
rctx,
logger,
consensusReplayConfig,
state,
@ -178,7 +177,7 @@ LOOP:
)
// start sending transactions
ctx, cancel := context.WithCancel(ctx)
ctx, cancel := context.WithCancel(rctx)
initFn(stateDB, cs, ctx)
// clean up WAL file from the previous iteration
@ -342,7 +341,10 @@ func setupSimulator(ctx context.Context, t *testing.T) *simulatorTestSuite {
newMockTickerFunc(true),
newPersistentKVStoreWithPath)
sim.Config = cfg
sim.GenesisState, _ = sm.MakeGenesisState(genDoc)
var err error
sim.GenesisState, err = sm.MakeGenesisState(genDoc)
require.NoError(t, err)
sim.CleanupFunc = cleanup
partSize := types.BlockPartSizeBytes
@ -455,7 +457,7 @@ func setupSimulator(ctx context.Context, t *testing.T) *simulatorTestSuite {
blockID = types.BlockID{Hash: propBlock.Hash(), PartSetHeader: propBlockParts.Header()}
newVss := make([]*validatorStub, nVals+1)
copy(newVss, vss[:nVals+1])
sort.Sort(ValidatorStubsByPower(newVss))
newVss = sortVValidatorStubsByPower(ctx, newVss)
valIndexFn := func(cssIdx int) int {
for i, vs := range newVss {
@ -500,7 +502,6 @@ func setupSimulator(ctx context.Context, t *testing.T) *simulatorTestSuite {
tmproto.PrecommitType, rs.ProposalBlock.Hash(),
rs.ProposalBlockParts.Header(), newVss[i])
}
ensureNewRound(newRoundCh, height+1, 0)
// HEIGHT 5
@ -509,7 +510,8 @@ func setupSimulator(ctx context.Context, t *testing.T) *simulatorTestSuite {
// Reflect the changes to vss[nVals] at height 3 and resort newVss.
newVssIdx := valIndexFn(nVals)
newVss[newVssIdx].VotingPower = 25
sort.Sort(ValidatorStubsByPower(newVss))
newVss = sortVValidatorStubsByPower(ctx, newVss)
selfIndex = valIndexFn(0)
ensureNewProposal(proposalCh, height, round)
rs = css[0].GetRoundState()
@ -534,7 +536,7 @@ func setupSimulator(ctx context.Context, t *testing.T) *simulatorTestSuite {
blockID = types.BlockID{Hash: propBlock.Hash(), PartSetHeader: propBlockParts.Header()}
newVss = make([]*validatorStub, nVals+3)
copy(newVss, vss[:nVals+3])
sort.Sort(ValidatorStubsByPower(newVss))
newVss = sortVValidatorStubsByPower(ctx, newVss)
selfIndex = valIndexFn(0)
proposal = types.NewProposal(vss[1].Height, round, -1, blockID)
@ -709,7 +711,7 @@ func tempWALWithData(data []byte) string {
// Make some blocks. Start a fresh app and apply nBlocks blocks.
// Then restart the app and sync it up with the remaining blocks
func testHandshakeReplay(
ctx context.Context,
rctx context.Context,
t *testing.T,
sim *simulatorTestSuite,
nBlocks int,
@ -721,9 +723,8 @@ func testHandshakeReplay(
var store *mockBlockStore
var stateDB dbm.DB
var genesisState sm.State
var cancel context.CancelFunc
ctx, cancel = context.WithCancel(ctx)
ctx, cancel := context.WithCancel(rctx)
t.Cleanup(cancel)
cfg := sim.Config


+ 15
- 14
internal/consensus/state.go View File

@ -131,7 +131,7 @@ type State struct {
nSteps int
// some functions can be overwritten for testing
decideProposal func(height int64, round int32)
decideProposal func(ctx context.Context, height int64, round int32)
doPrevote func(ctx context.Context, height int64, round int32)
setProposal func(proposal *types.Proposal) error
@ -272,7 +272,7 @@ func (cs *State) GetValidators() (int64, []*types.Validator) {
// SetPrivValidator sets the private validator account for signing votes. It
// immediately requests pubkey and caches it.
func (cs *State) SetPrivValidator(priv types.PrivValidator) {
func (cs *State) SetPrivValidator(ctx context.Context, priv types.PrivValidator) {
cs.mtx.Lock()
defer cs.mtx.Unlock()
@ -298,7 +298,7 @@ func (cs *State) SetPrivValidator(priv types.PrivValidator) {
}
}
if err := cs.updatePrivValidatorPubKey(); err != nil {
if err := cs.updatePrivValidatorPubKey(ctx); err != nil {
cs.logger.Error("failed to get private validator pubkey", "err", err)
}
}
@ -1207,7 +1207,7 @@ func (cs *State) enterPropose(ctx context.Context, height int64, round int32) {
"proposer", address,
)
cs.decideProposal(height, round)
cs.decideProposal(ctx, height, round)
} else {
logger.Debug(
"propose step; not our turn to propose",
@ -1220,7 +1220,7 @@ func (cs *State) isProposer(address []byte) bool {
return bytes.Equal(cs.Validators.GetProposer().Address, address)
}
func (cs *State) defaultDecideProposal(height int64, round int32) {
func (cs *State) defaultDecideProposal(ctx context.Context, height int64, round int32) {
var block *types.Block
var blockParts *types.PartSet
@ -1248,9 +1248,9 @@ func (cs *State) defaultDecideProposal(height int64, round int32) {
p := proposal.ToProto()
// wait the max amount we would wait for a proposal
ctx, cancel := context.WithTimeout(context.TODO(), cs.config.TimeoutPropose)
ctxto, cancel := context.WithTimeout(ctx, cs.config.TimeoutPropose)
defer cancel()
if err := cs.privValidator.SignProposal(ctx, cs.state.ChainID, p); err == nil {
if err := cs.privValidator.SignProposal(ctxto, cs.state.ChainID, p); err == nil {
proposal.Signature = p.Signature
// send proposal and block parts on internal msg queue
@ -1772,7 +1772,7 @@ func (cs *State) finalizeCommit(ctx context.Context, height int64) {
fail.Fail() // XXX
// Private validator might have changed it's key pair => refetch pubkey.
if err := cs.updatePrivValidatorPubKey(); err != nil {
if err := cs.updatePrivValidatorPubKey(ctx); err != nil {
logger.Error("failed to get private validator pubkey", "err", err)
}
@ -2250,6 +2250,7 @@ func (cs *State) addVote(
// CONTRACT: cs.privValidator is not nil.
func (cs *State) signVote(
ctx context.Context,
msgType tmproto.SignedMsgType,
hash []byte,
header types.PartSetHeader,
@ -2292,10 +2293,10 @@ func (cs *State) signVote(
timeout = time.Second
}
ctx, cancel := context.WithTimeout(context.TODO(), timeout)
ctxto, cancel := context.WithTimeout(ctx, timeout)
defer cancel()
err := cs.privValidator.SignVote(ctx, cs.state.ChainID, v)
err := cs.privValidator.SignVote(ctxto, cs.state.ChainID, v)
vote.Signature = v.Signature
vote.Timestamp = v.Timestamp
@ -2344,7 +2345,7 @@ func (cs *State) signAddVote(ctx context.Context, msgType tmproto.SignedMsgType,
}
// TODO: pass pubKey to signVote
vote, err := cs.signVote(msgType, hash, header)
vote, err := cs.signVote(ctx, msgType, hash, header)
if err == nil {
cs.sendInternalMessage(ctx, msgInfo{&VoteMessage{vote}, ""})
cs.logger.Debug("signed and pushed vote", "height", cs.Height, "round", cs.Round, "vote", vote)
@ -2358,7 +2359,7 @@ func (cs *State) signAddVote(ctx context.Context, msgType tmproto.SignedMsgType,
// updatePrivValidatorPubKey get's the private validator public key and
// memoizes it. This func returns an error if the private validator is not
// responding or responds with an error.
func (cs *State) updatePrivValidatorPubKey() error {
func (cs *State) updatePrivValidatorPubKey(rctx context.Context) error {
if cs.privValidator == nil {
return nil
}
@ -2377,9 +2378,9 @@ func (cs *State) updatePrivValidatorPubKey() error {
// set context timeout depending on the configuration and the State step,
// this helps in avoiding blocking of the remote signer connection.
ctx, cancel := context.WithTimeout(context.TODO(), timeout)
ctxto, cancel := context.WithTimeout(rctx, timeout)
defer cancel()
pubKey, err := cs.privValidator.GetPubKey(ctx)
pubKey, err := cs.privValidator.GetPubKey(ctxto)
if err != nil {
return err
}


+ 1
- 1
internal/consensus/state_test.go View File

@ -160,7 +160,7 @@ func TestStateEnterProposeNoPrivValidator(t *testing.T) {
cs, _, err := randState(ctx, config, log.TestingLogger(), 1)
require.NoError(t, err)
cs.SetPrivValidator(nil)
cs.SetPrivValidator(ctx, nil)
height, round := cs.Height, cs.Round
// Listen for propose timeout event


+ 1
- 1
internal/consensus/wal_generator.go View File

@ -83,7 +83,7 @@ func WALGenerateNBlocks(ctx context.Context, t *testing.T, wr io.Writer, numBloc
consensusState := NewState(ctx, logger, cfg.Consensus, state.Copy(), blockExec, blockStore, mempool, evpool)
consensusState.SetEventBus(eventBus)
if privValidator != nil && privValidator != (*privval.FilePV)(nil) {
consensusState.SetPrivValidator(privValidator)
consensusState.SetPrivValidator(ctx, privValidator)
}
// END OF COPY PASTE


+ 0
- 4
internal/mempool/mempool.go View File

@ -233,10 +233,6 @@ func (txmp *TxMempool) CheckTx(
cb func(*abci.Response),
txInfo TxInfo,
) error {
if ctx == nil {
ctx = context.TODO()
}
txmp.mtx.RLock()
defer txmp.mtx.RUnlock()


+ 3
- 4
internal/state/execution.go View File

@ -162,7 +162,7 @@ func (blockExec *BlockExecutor) ApplyBlock(
}
startTime := time.Now().UnixNano()
abciResponses, err := execBlockOnProxyApp(
abciResponses, err := execBlockOnProxyApp(ctx,
blockExec.logger, blockExec.proxyApp, block, blockExec.store, state.InitialHeight,
)
endTime := time.Now().UnixNano()
@ -297,6 +297,7 @@ func (blockExec *BlockExecutor) Commit(
// Executes block's transactions on proxyAppConn.
// Returns a list of transaction results and updates to the validator set
func execBlockOnProxyApp(
ctx context.Context,
logger log.Logger,
proxyAppConn proxy.AppConnConsensus,
block *types.Block,
@ -337,8 +338,6 @@ func execBlockOnProxyApp(
byzVals = append(byzVals, evidence.ABCI()...)
}
ctx := context.TODO()
// Begin block
var err error
pbh := block.Header.ToProto()
@ -584,7 +583,7 @@ func ExecCommitBlock(
initialHeight int64,
s State,
) ([]byte, error) {
abciResponses, err := execBlockOnProxyApp(logger, appConnConsensus, block, store, initialHeight)
abciResponses, err := execBlockOnProxyApp(ctx, logger, appConnConsensus, block, store, initialHeight)
if err != nil {
logger.Error("failed executing block on proxy app", "height", block.Height, "err", err)
return nil, err


+ 1
- 1
internal/state/indexer/indexer_service.go View File

@ -122,7 +122,7 @@ func (is *Service) OnStart(ctx context.Context) error {
// If the event sinks support indexing, register an observer to capture
// block header data for the indexer.
if IndexingEnabled(is.eventSinks) {
err := is.eventBus.Observe(context.TODO(), is.publish,
err := is.eventBus.Observe(ctx, is.publish,
types.EventQueryNewBlockHeader, types.EventQueryTx)
if err != nil {
return err


+ 5
- 2
internal/state/indexer/sink/null/null_test.go View File

@ -10,14 +10,17 @@ import (
)
func TestNullEventSink(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
nullIndexer := NewEventSink()
assert.Nil(t, nullIndexer.IndexTxEvents(nil))
assert.Nil(t, nullIndexer.IndexBlockEvents(types.EventDataNewBlockHeader{}))
val1, err1 := nullIndexer.SearchBlockEvents(context.TODO(), nil)
val1, err1 := nullIndexer.SearchBlockEvents(ctx, nil)
assert.Nil(t, val1)
assert.Nil(t, err1)
val2, err2 := nullIndexer.SearchTxEvents(context.TODO(), nil)
val2, err2 := nullIndexer.SearchTxEvents(ctx, nil)
assert.Nil(t, val2)
assert.Nil(t, err2)
val3, err3 := nullIndexer.GetTxByHash(nil)


+ 4
- 4
internal/statesync/reactor.go View File

@ -508,7 +508,7 @@ func (r *Reactor) handleSnapshotMessage(ctx context.Context, envelope *p2p.Envel
switch msg := envelope.Message.(type) {
case *ssproto.SnapshotsRequest:
snapshots, err := r.recentSnapshots(recentSnapshots)
snapshots, err := r.recentSnapshots(ctx, recentSnapshots)
if err != nil {
logger.Error("failed to fetch snapshots", "err", err)
return nil
@ -585,7 +585,7 @@ func (r *Reactor) handleChunkMessage(ctx context.Context, envelope *p2p.Envelope
"chunk", msg.Index,
"peer", envelope.From,
)
resp, err := r.conn.LoadSnapshotChunkSync(context.TODO(), abci.RequestLoadSnapshotChunk{
resp, err := r.conn.LoadSnapshotChunkSync(ctx, abci.RequestLoadSnapshotChunk{
Height: msg.Height,
Format: msg.Format,
Chunk: msg.Index,
@ -875,8 +875,8 @@ func (r *Reactor) processPeerUpdates(ctx context.Context) {
}
// recentSnapshots fetches the n most recent snapshots from the app
func (r *Reactor) recentSnapshots(n uint32) ([]*snapshot, error) {
resp, err := r.conn.ListSnapshotsSync(context.TODO(), abci.RequestListSnapshots{})
func (r *Reactor) recentSnapshots(ctx context.Context, n uint32) ([]*snapshot, error) {
resp, err := r.conn.ListSnapshotsSync(ctx, abci.RequestListSnapshots{})
if err != nil {
return nil, err
}


+ 12
- 18
internal/statesync/syncer.go View File

@ -348,10 +348,12 @@ func (s *syncer) Sync(ctx context.Context, snapshot *snapshot, chunks *chunkQueu
return sm.State{}, nil, err
}
// Verify app and app version
if err := s.verifyApp(snapshot, state.Version.Consensus.App); err != nil {
// Verify app and update app version
appVersion, err := s.verifyApp(ctx, snapshot)
if err != nil {
return sm.State{}, nil, err
}
state.Version.Consensus.App = appVersion
// Done! 🎉
s.logger.Info("Snapshot restored", "height", snapshot.Height, "format", snapshot.Format,
@ -545,27 +547,19 @@ func (s *syncer) requestChunk(ctx context.Context, snapshot *snapshot, chunk uin
return nil
}
// verifyApp verifies the sync, checking the app hash, last block height and app version
func (s *syncer) verifyApp(snapshot *snapshot, appVersion uint64) error {
resp, err := s.connQuery.InfoSync(context.TODO(), proxy.RequestInfo)
// verifyApp verifies the sync, checking the app hash and last block height. It returns the
// app version, which should be returned as part of the initial state.
func (s *syncer) verifyApp(ctx context.Context, snapshot *snapshot) (uint64, error) {
resp, err := s.connQuery.InfoSync(ctx, proxy.RequestInfo)
if err != nil {
return fmt.Errorf("failed to query ABCI app for appHash: %w", err)
}
// sanity check that the app version in the block matches the application's own record
// of its version
if resp.AppVersion != appVersion {
// An error here most like means that the app hasn't inplemented state sync
// or the Info call correctly
return fmt.Errorf("app version mismatch. Expected: %d, got: %d",
appVersion, resp.AppVersion)
return 0, fmt.Errorf("failed to query ABCI app for appHash: %w", err)
}
if !bytes.Equal(snapshot.trustedAppHash, resp.LastBlockAppHash) {
s.logger.Error("appHash verification failed",
"expected", snapshot.trustedAppHash,
"actual", resp.LastBlockAppHash)
return errVerifyFailed
return 0, errVerifyFailed
}
if uint64(resp.LastBlockHeight) != snapshot.Height {
@ -574,9 +568,9 @@ func (s *syncer) verifyApp(snapshot *snapshot, appVersion uint64) error {
"expected", snapshot.Height,
"actual", resp.LastBlockHeight,
)
return errVerifyFailed
return 0, errVerifyFailed
}
s.logger.Info("Verified ABCI app", "height", snapshot.Height, "appHash", snapshot.trustedAppHash)
return nil
return resp.AppVersion, nil
}

+ 13
- 13
internal/statesync/syncer_test.go View File

@ -30,7 +30,7 @@ func TestSyncer_SyncAny(t *testing.T) {
Version: sm.Version{
Consensus: version.Consensus{
Block: version.BlockProtocol,
App: testAppVersion,
App: 0,
},
Software: version.TMVersion,
},
@ -178,7 +178,7 @@ func TestSyncer_SyncAny(t *testing.T) {
Index: 2, Chunk: []byte{1, 1, 2},
}).Once().Return(&abci.ResponseApplySnapshotChunk{Result: abci.ResponseApplySnapshotChunk_ACCEPT}, nil)
connQuery.On("InfoSync", mock.Anything, proxy.RequestInfo).Return(&abci.ResponseInfo{
AppVersion: testAppVersion,
AppVersion: 9,
LastBlockHeight: 1,
LastBlockAppHash: []byte("app_hash"),
}, nil)
@ -192,7 +192,10 @@ func TestSyncer_SyncAny(t *testing.T) {
require.Equal(t, map[uint32]int{0: 1, 1: 2, 2: 1}, chunkRequests)
chunkRequestsMtx.Unlock()
// The syncer should have updated the state app version from the ABCI info response.
expectState := state
expectState.Version.Consensus.App = 9
require.Equal(t, expectState, newState)
require.Equal(t, commit, lastCommit)
@ -710,8 +713,6 @@ func TestSyncer_applyChunks_RejectSenders(t *testing.T) {
func TestSyncer_verifyApp(t *testing.T) {
boom := errors.New("boom")
appVersion := uint64(9)
appVersionMismatchErr := errors.New("app version mismatch. Expected: 9, got: 2")
s := &snapshot{Height: 3, Format: 1, Chunks: 5, Hash: []byte{1, 2, 3}, trustedAppHash: []byte("app_hash")}
testcases := map[string]struct {
@ -722,22 +723,17 @@ func TestSyncer_verifyApp(t *testing.T) {
"verified": {&abci.ResponseInfo{
LastBlockHeight: 3,
LastBlockAppHash: []byte("app_hash"),
AppVersion: appVersion,
AppVersion: 9,
}, nil, nil},
"invalid app version": {&abci.ResponseInfo{
LastBlockHeight: 3,
LastBlockAppHash: []byte("app_hash"),
AppVersion: 2,
}, nil, appVersionMismatchErr},
"invalid height": {&abci.ResponseInfo{
LastBlockHeight: 5,
LastBlockAppHash: []byte("app_hash"),
AppVersion: appVersion,
AppVersion: 9,
}, nil, errVerifyFailed},
"invalid hash": {&abci.ResponseInfo{
LastBlockHeight: 3,
LastBlockAppHash: []byte("xxx"),
AppVersion: appVersion,
AppVersion: 9,
}, nil, errVerifyFailed},
"error": {nil, boom, boom},
}
@ -753,12 +749,16 @@ func TestSyncer_verifyApp(t *testing.T) {
rts := setup(ctx, t, nil, nil, nil, 2)
rts.connQuery.On("InfoSync", mock.Anything, proxy.RequestInfo).Return(tc.response, tc.err)
err := rts.syncer.verifyApp(s, appVersion)
version, err := rts.syncer.verifyApp(ctx, s)
unwrapped := errors.Unwrap(err)
if unwrapped != nil {
err = unwrapped
}
require.Equal(t, tc.expectErr, err)
if err == nil {
require.Equal(t, tc.response.AppVersion, version)
}
})
}
}


+ 1
- 0
node/node.go View File

@ -1034,6 +1034,7 @@ func createAndStartPrivValidatorGRPCClient(
logger log.Logger,
) (types.PrivValidator, error) {
pvsc, err := tmgrpc.DialRemoteSigner(
ctx,
cfg.PrivValidator,
chainID,
logger,


+ 1
- 1
node/setup.go View File

@ -310,7 +310,7 @@ func createConsensusReactor(
)
if privValidator != nil && cfg.Mode == config.ModeValidator {
consensusState.SetPrivValidator(privValidator)
consensusState.SetPrivValidator(ctx, privValidator)
}
csChDesc := consensus.GetChannelDescriptors()


+ 1
- 1
privval/grpc/util.go View File

@ -88,6 +88,7 @@ func GenerateTLS(certPath, keyPath, ca string, log log.Logger) grpc.DialOption {
// DialRemoteSigner is a generalized function to dial the gRPC server.
func DialRemoteSigner(
ctx context.Context,
cfg *config.PrivValidatorConfig,
chainID string,
logger log.Logger,
@ -110,7 +111,6 @@ func DialRemoteSigner(
dialOptions = append(dialOptions, transportSecurity)
ctx := context.TODO()
_, address := tmnet.ProtocolAndAddress(cfg.ListenAddr)
conn, err := grpc.DialContext(ctx, address, dialOptions...)
if err != nil {


+ 3
- 3
privval/signer_server.go View File

@ -60,7 +60,7 @@ func (ss *SignerServer) SetRequestHandler(validationRequestHandler ValidationReq
ss.validationRequestHandler = validationRequestHandler
}
func (ss *SignerServer) servicePendingRequest() {
func (ss *SignerServer) servicePendingRequest(ctx context.Context) {
if !ss.IsRunning() {
return // Ignore error from closing.
}
@ -78,7 +78,7 @@ func (ss *SignerServer) servicePendingRequest() {
// limit the scope of the lock
ss.handlerMtx.Lock()
defer ss.handlerMtx.Unlock()
res, err = ss.validationRequestHandler(context.TODO(), ss.privVal, req, ss.chainID) // todo
res, err = ss.validationRequestHandler(ctx, ss.privVal, req, ss.chainID) // todo
if err != nil {
// only log the error; we'll reply with an error in res
ss.endpoint.logger.Error("SignerServer: handleMessage", "err", err)
@ -100,7 +100,7 @@ func (ss *SignerServer) serviceLoop(ctx context.Context) {
if err := ss.endpoint.ensureConnection(); err != nil {
return
}
ss.servicePendingRequest()
ss.servicePendingRequest(ctx)
}
}
}

+ 4
- 1
types/evidence_test.go View File

@ -317,6 +317,9 @@ func TestEvidenceProto(t *testing.T) {
}
func TestEvidenceVectors(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
// Votes for duplicateEvidence
val := NewMockPV()
val.PrivKey = ed25519.GenPrivKeyFromSecret([]byte("it's a secret")) // deterministic key
@ -330,7 +333,7 @@ func TestEvidenceVectors(t *testing.T) {
height := int64(5)
commonHeight := height - 1
nValidators := 10
voteSet, valSet, privVals := deterministicVoteSet(height, 1, tmproto.PrecommitType, 1)
voteSet, valSet, privVals := deterministicVoteSet(ctx, height, 1, tmproto.PrecommitType, 1)
header := &Header{
Version: version.Consensus{Block: 1, App: 1},
ChainID: chainID,


+ 2
- 2
types/validator_set_test.go View File

@ -1599,7 +1599,7 @@ func BenchmarkValidatorSet_VerifyCommitLightTrusting_Ed25519(b *testing.B) {
// where each validator has a power of 50
//
// EXPOSED FOR TESTING.
func deterministicValidatorSet() (*ValidatorSet, []PrivValidator) {
func deterministicValidatorSet(ctx context.Context) (*ValidatorSet, []PrivValidator) {
var (
valz = make([]*Validator, 10)
privValidators = make([]PrivValidator, 10)
@ -1607,7 +1607,7 @@ func deterministicValidatorSet() (*ValidatorSet, []PrivValidator) {
for i := 0; i < 10; i++ {
// val, privValidator := DeterministicValidator(ed25519.PrivKey([]byte(deterministicKeys[i])))
val, privValidator := deterministicValidator(ed25519.GenPrivKeyFromSecret([]byte(fmt.Sprintf("key: %x", i))))
val, privValidator := deterministicValidator(ctx, ed25519.GenPrivKeyFromSecret([]byte(fmt.Sprintf("key: %x", i))))
valz[i] = val
privValidators[i] = privValidator
}


+ 2
- 2
types/validator_test.go View File

@ -105,11 +105,11 @@ func TestValidatorValidateBasic(t *testing.T) {
// deterministicValidator returns a deterministic validator, useful for testing.
// UNSTABLE
func deterministicValidator(key crypto.PrivKey) (*Validator, PrivValidator) {
func deterministicValidator(ctx context.Context, key crypto.PrivKey) (*Validator, PrivValidator) {
privVal := NewMockPV()
privVal.PrivKey = key
var votePower int64 = 50
pubKey, err := privVal.GetPubKey(context.TODO())
pubKey, err := privVal.GetPubKey(ctx)
if err != nil {
panic(fmt.Errorf("could not retrieve pubkey %w", err))
}


+ 2
- 1
types/vote_set_test.go View File

@ -489,12 +489,13 @@ func randVoteSet(
}
func deterministicVoteSet(
ctx context.Context,
height int64,
round int32,
signedMsgType tmproto.SignedMsgType,
votingPower int64,
) (*VoteSet, *ValidatorSet, []PrivValidator) {
valSet, privValidators := deterministicValidatorSet()
valSet, privValidators := deterministicValidatorSet(ctx)
return NewVoteSet("test_chain_id", height, round, signedMsgType, valSet), valSet, privValidators
}


Loading…
Cancel
Save