From bef120dadf756f25e8a186c3a2382063565e953f Mon Sep 17 00:00:00 2001 From: Sam Kleinman Date: Thu, 16 Dec 2021 15:15:26 -0500 Subject: [PATCH] contexts: remove all TODO instances (#7466) --- cmd/tendermint/commands/init.go | 6 +-- cmd/tendermint/commands/show_validator.go | 7 +-- cmd/tendermint/commands/testnet.go | 8 ++-- internal/consensus/byzantine_test.go | 8 ++-- internal/consensus/common_test.go | 43 ++++++++++--------- internal/consensus/reactor_test.go | 2 +- internal/consensus/replay.go | 4 +- internal/consensus/replay_file.go | 5 +-- internal/consensus/replay_test.go | 29 +++++++------ internal/consensus/state.go | 29 +++++++------ internal/consensus/state_test.go | 2 +- internal/consensus/wal_generator.go | 2 +- internal/mempool/mempool.go | 4 -- internal/state/execution.go | 7 ++- internal/state/indexer/indexer_service.go | 2 +- internal/state/indexer/sink/null/null_test.go | 7 ++- internal/statesync/reactor.go | 8 ++-- internal/statesync/syncer.go | 30 ++++++------- internal/statesync/syncer_test.go | 26 +++++------ node/node.go | 1 + node/setup.go | 2 +- privval/grpc/util.go | 2 +- privval/signer_server.go | 6 +-- types/evidence_test.go | 5 ++- types/validator_set_test.go | 4 +- types/validator_test.go | 4 +- types/vote_set_test.go | 3 +- 27 files changed, 128 insertions(+), 128 deletions(-) diff --git a/cmd/tendermint/commands/init.go b/cmd/tendermint/commands/init.go index 02e400a0a..6bfde7ad8 100644 --- a/cmd/tendermint/commands/init.go +++ b/cmd/tendermint/commands/init.go @@ -39,10 +39,10 @@ func initFiles(cmd *cobra.Command, args []string) error { return errors.New("must specify a node type: tendermint init [validator|full|seed]") } config.Mode = args[0] - return initFilesWithConfig(config) + return initFilesWithConfig(cmd.Context(), config) } -func initFilesWithConfig(config *cfg.Config) error { +func initFilesWithConfig(ctx context.Context, config *cfg.Config) error { var ( pv *privval.FilePV err error @@ -98,7 +98,7 @@ func initFilesWithConfig(config *cfg.Config) error { } } - ctx, cancel := context.WithTimeout(context.TODO(), ctxTimeout) + ctx, cancel := context.WithTimeout(ctx, ctxTimeout) defer cancel() // if this is a validator we add it to genesis diff --git a/cmd/tendermint/commands/show_validator.go b/cmd/tendermint/commands/show_validator.go index 47b372c61..b6b9c67ec 100644 --- a/cmd/tendermint/commands/show_validator.go +++ b/cmd/tendermint/commands/show_validator.go @@ -25,13 +25,14 @@ func showValidator(cmd *cobra.Command, args []string) error { var ( pubKey crypto.PubKey err error + bctx = cmd.Context() ) - //TODO: remove once gRPC is the only supported protocol protocol, _ := tmnet.ProtocolAndAddress(config.PrivValidator.ListenAddr) switch protocol { case "grpc": pvsc, err := tmgrpc.DialRemoteSigner( + bctx, config.PrivValidator, config.ChainID(), logger, @@ -41,7 +42,7 @@ func showValidator(cmd *cobra.Command, args []string) error { return fmt.Errorf("can't connect to remote validator %w", err) } - ctx, cancel := context.WithTimeout(context.TODO(), ctxTimeout) + ctx, cancel := context.WithTimeout(bctx, ctxTimeout) defer cancel() pubKey, err = pvsc.GetPubKey(ctx) @@ -60,7 +61,7 @@ func showValidator(cmd *cobra.Command, args []string) error { return err } - ctx, cancel := context.WithTimeout(context.TODO(), ctxTimeout) + ctx, cancel := context.WithTimeout(bctx, ctxTimeout) defer cancel() pubKey, err = pv.GetPubKey(ctx) diff --git a/cmd/tendermint/commands/testnet.go b/cmd/tendermint/commands/testnet.go index 95955dd9b..75fdb05c5 100644 --- a/cmd/tendermint/commands/testnet.go +++ b/cmd/tendermint/commands/testnet.go @@ -122,7 +122,7 @@ func testnetFiles(cmd *cobra.Command, args []string) error { } genVals := make([]types.GenesisValidator, nValidators) - + ctx := cmd.Context() for i := 0; i < nValidators; i++ { nodeDirName := fmt.Sprintf("%s%d", nodeDirPrefix, i) nodeDir := filepath.Join(outputDir, nodeDirName) @@ -139,7 +139,7 @@ func testnetFiles(cmd *cobra.Command, args []string) error { return err } - if err := initFilesWithConfig(config); err != nil { + if err := initFilesWithConfig(ctx, config); err != nil { return err } @@ -150,7 +150,7 @@ func testnetFiles(cmd *cobra.Command, args []string) error { return err } - ctx, cancel := context.WithTimeout(context.TODO(), ctxTimeout) + ctx, cancel := context.WithTimeout(ctx, ctxTimeout) defer cancel() pubKey, err := pv.GetPubKey(ctx) @@ -181,7 +181,7 @@ func testnetFiles(cmd *cobra.Command, args []string) error { return err } - if err := initFilesWithConfig(config); err != nil { + if err := initFilesWithConfig(ctx, config); err != nil { return err } } diff --git a/internal/consensus/byzantine_test.go b/internal/consensus/byzantine_test.go index e80b79c84..d9a8e02d3 100644 --- a/internal/consensus/byzantine_test.go +++ b/internal/consensus/byzantine_test.go @@ -92,7 +92,7 @@ func TestByzantinePrevoteEquivocation(t *testing.T) { cs := NewState(ctx, logger, thisConfig.Consensus, state, blockExec, blockStore, mempool, evpool) // set private validator pv := privVals[i] - cs.SetPrivValidator(pv) + cs.SetPrivValidator(ctx, pv) eventBus := eventbus.NewDefault(log.TestingLogger().With("module", "events")) err = eventBus.Start(ctx) @@ -125,14 +125,14 @@ func TestByzantinePrevoteEquivocation(t *testing.T) { bzNodeState.doPrevote = func(ctx context.Context, height int64, round int32) { // allow first height to happen normally so that byzantine validator is no longer proposer if height == prevoteHeight { - prevote1, err := bzNodeState.signVote( + prevote1, err := bzNodeState.signVote(ctx, tmproto.PrevoteType, bzNodeState.ProposalBlock.Hash(), bzNodeState.ProposalBlockParts.Header(), ) require.NoError(t, err) - prevote2, err := bzNodeState.signVote(tmproto.PrevoteType, nil, types.PartSetHeader{}) + prevote2, err := bzNodeState.signVote(ctx, tmproto.PrevoteType, nil, types.PartSetHeader{}) require.NoError(t, err) // send two votes to all peers (1st to one half, 2nd to another half) @@ -172,7 +172,7 @@ func TestByzantinePrevoteEquivocation(t *testing.T) { // lazyProposer := states[1] lazyNodeState := states[1] - lazyNodeState.decideProposal = func(height int64, round int32) { + lazyNodeState.decideProposal = func(ctx context.Context, height int64, round int32) { lazyNodeState.logger.Info("Lazy Proposer proposing condensed commit") require.NotNil(t, lazyNodeState.privValidator) diff --git a/internal/consensus/common_test.go b/internal/consensus/common_test.go index ec031b23b..57f841945 100644 --- a/internal/consensus/common_test.go +++ b/internal/consensus/common_test.go @@ -8,6 +8,7 @@ import ( "os" "path" "path/filepath" + "sort" "sync" "testing" "time" @@ -195,28 +196,28 @@ func (vss ValidatorStubsByPower) Len() int { return len(vss) } -func (vss ValidatorStubsByPower) Less(i, j int) bool { - vssi, err := vss[i].GetPubKey(context.TODO()) - if err != nil { - panic(err) - } - vssj, err := vss[j].GetPubKey(context.TODO()) - if err != nil { - panic(err) - } +func sortVValidatorStubsByPower(ctx context.Context, vss []*validatorStub) []*validatorStub { + sort.Slice(vss, func(i, j int) bool { + vssi, err := vss[i].GetPubKey(ctx) + if err != nil { + panic(err) + } + vssj, err := vss[j].GetPubKey(ctx) + if err != nil { + panic(err) + } - if vss[i].VotingPower == vss[j].VotingPower { - return bytes.Compare(vssi.Address(), vssj.Address()) == -1 + if vss[i].VotingPower == vss[j].VotingPower { + return bytes.Compare(vssi.Address(), vssj.Address()) == -1 + } + return vss[i].VotingPower > vss[j].VotingPower + }) + + for idx, vs := range vss { + vs.Index = int32(idx) } - return vss[i].VotingPower > vss[j].VotingPower -} -func (vss ValidatorStubsByPower) Swap(i, j int) { - it := vss[i] - vss[i] = vss[j] - vss[i].Index = int32(i) - vss[j] = it - vss[j].Index = int32(j) + return vss } //------------------------------------------------------------------------------- @@ -475,7 +476,7 @@ func newStateWithConfigAndBlockStore( mempool, evpool, ) - cs.SetPrivValidator(pv) + cs.SetPrivValidator(ctx, pv) eventBus := eventbus.NewDefault(logger.With("module", "events")) err := eventBus.Start(ctx) @@ -814,7 +815,7 @@ func randConsensusState( func randConsensusNetWithPeers( ctx context.Context, cfg *config.Config, - nValidators, + nValidators int, nPeers int, testName string, tickerFunc func() TimeoutTicker, diff --git a/internal/consensus/reactor_test.go b/internal/consensus/reactor_test.go index 887f9fd3e..26d9c1482 100644 --- a/internal/consensus/reactor_test.go +++ b/internal/consensus/reactor_test.go @@ -422,7 +422,7 @@ func TestReactorWithEvidence(t *testing.T) { blockExec := sm.NewBlockExecutor(stateStore, log.TestingLogger(), proxyAppConnCon, mempool, evpool, blockStore) cs := NewState(ctx, logger.With("validator", i, "module", "consensus"), thisConfig.Consensus, state, blockExec, blockStore, mempool, evpool2) - cs.SetPrivValidator(pv) + cs.SetPrivValidator(ctx, pv) eventBus := eventbus.NewDefault(log.TestingLogger().With("module", "events")) require.NoError(t, eventBus.Start(ctx)) diff --git a/internal/consensus/replay.go b/internal/consensus/replay.go index 3ee88337d..d5ba21023 100644 --- a/internal/consensus/replay.go +++ b/internal/consensus/replay.go @@ -50,9 +50,9 @@ func (cs *State) readReplayMessage(ctx context.Context, msg *TimedWALMessage, ne cs.logger.Info("Replay: New Step", "height", m.Height, "round", m.Round, "step", m.Step) // these are playback checks if newStepSub != nil { - ctx, cancel := context.WithTimeout(ctx, 2*time.Second) + ctxto, cancel := context.WithTimeout(ctx, 2*time.Second) defer cancel() - stepMsg, err := newStepSub.Next(ctx) + stepMsg, err := newStepSub.Next(ctxto) if errors.Is(err, context.DeadlineExceeded) { return fmt.Errorf("subscription timed out: %w", err) } else if err != nil { diff --git a/internal/consensus/replay_file.go b/internal/consensus/replay_file.go index b9a14ff50..d53ea505f 100644 --- a/internal/consensus/replay_file.go +++ b/internal/consensus/replay_file.go @@ -91,7 +91,7 @@ func (cs *State) ReplayFile(ctx context.Context, file string, console bool) erro var msg *TimedWALMessage for { if nextN == 0 && console { - nextN, err = pb.replayConsoleLoop() + nextN, err = pb.replayConsoleLoop(ctx) if err != nil { return err } @@ -187,7 +187,7 @@ func (cs *State) startForReplay() { // console function for parsing input and running commands. The integer // return value is invalid unless the error is nil. -func (pb *playback) replayConsoleLoop() (int, error) { +func (pb *playback) replayConsoleLoop(ctx context.Context) (int, error) { for { fmt.Printf("> ") bufReader := bufio.NewReader(os.Stdin) @@ -225,7 +225,6 @@ func (pb *playback) replayConsoleLoop() (int, error) { // NOTE: "back" is not supported in the state machine design, // so we restart and replay up to - ctx := context.TODO() // ensure all new step events are regenerated as expected newStepSub, err := pb.cs.eventBus.SubscribeWithArgs(ctx, tmpubsub.SubscribeArgs{ diff --git a/internal/consensus/replay_test.go b/internal/consensus/replay_test.go index 76e020c22..ce739e116 100644 --- a/internal/consensus/replay_test.go +++ b/internal/consensus/replay_test.go @@ -10,7 +10,6 @@ import ( "os" "path/filepath" "runtime" - "sort" "testing" "time" @@ -93,9 +92,9 @@ func startNewStateAndWaitForBlock(ctx context.Context, t *testing.T, consensusRe Query: types.EventQueryNewBlock, }) require.NoError(t, err) - ctx, cancel := context.WithTimeout(ctx, 120*time.Second) + ctxto, cancel := context.WithTimeout(ctx, 120*time.Second) defer cancel() - _, err = newBlockSub.Next(ctx) + _, err = newBlockSub.Next(ctxto) if errors.Is(err, context.DeadlineExceeded) { t.Fatal("Timed out waiting for new block (see trace above)") } else if err != nil { @@ -148,7 +147,7 @@ func TestWALCrash(t *testing.T) { } } -func crashWALandCheckLiveness(ctx context.Context, t *testing.T, consensusReplayConfig *config.Config, +func crashWALandCheckLiveness(rctx context.Context, t *testing.T, consensusReplayConfig *config.Config, initFn func(dbm.DB, *State, context.Context), heightToStop int64) { walPanicked := make(chan error) crashingWal := &crashingWAL{panicCh: walPanicked, heightToStop: heightToStop} @@ -168,7 +167,7 @@ LOOP: require.NoError(t, err) privValidator := loadPrivValidator(consensusReplayConfig) cs := newStateWithConfigAndBlockStore( - ctx, + rctx, logger, consensusReplayConfig, state, @@ -178,7 +177,7 @@ LOOP: ) // start sending transactions - ctx, cancel := context.WithCancel(ctx) + ctx, cancel := context.WithCancel(rctx) initFn(stateDB, cs, ctx) // clean up WAL file from the previous iteration @@ -342,7 +341,10 @@ func setupSimulator(ctx context.Context, t *testing.T) *simulatorTestSuite { newMockTickerFunc(true), newPersistentKVStoreWithPath) sim.Config = cfg - sim.GenesisState, _ = sm.MakeGenesisState(genDoc) + + var err error + sim.GenesisState, err = sm.MakeGenesisState(genDoc) + require.NoError(t, err) sim.CleanupFunc = cleanup partSize := types.BlockPartSizeBytes @@ -455,7 +457,7 @@ func setupSimulator(ctx context.Context, t *testing.T) *simulatorTestSuite { blockID = types.BlockID{Hash: propBlock.Hash(), PartSetHeader: propBlockParts.Header()} newVss := make([]*validatorStub, nVals+1) copy(newVss, vss[:nVals+1]) - sort.Sort(ValidatorStubsByPower(newVss)) + newVss = sortVValidatorStubsByPower(ctx, newVss) valIndexFn := func(cssIdx int) int { for i, vs := range newVss { @@ -500,7 +502,6 @@ func setupSimulator(ctx context.Context, t *testing.T) *simulatorTestSuite { tmproto.PrecommitType, rs.ProposalBlock.Hash(), rs.ProposalBlockParts.Header(), newVss[i]) } - ensureNewRound(newRoundCh, height+1, 0) // HEIGHT 5 @@ -509,7 +510,8 @@ func setupSimulator(ctx context.Context, t *testing.T) *simulatorTestSuite { // Reflect the changes to vss[nVals] at height 3 and resort newVss. newVssIdx := valIndexFn(nVals) newVss[newVssIdx].VotingPower = 25 - sort.Sort(ValidatorStubsByPower(newVss)) + newVss = sortVValidatorStubsByPower(ctx, newVss) + selfIndex = valIndexFn(0) ensureNewProposal(proposalCh, height, round) rs = css[0].GetRoundState() @@ -534,7 +536,7 @@ func setupSimulator(ctx context.Context, t *testing.T) *simulatorTestSuite { blockID = types.BlockID{Hash: propBlock.Hash(), PartSetHeader: propBlockParts.Header()} newVss = make([]*validatorStub, nVals+3) copy(newVss, vss[:nVals+3]) - sort.Sort(ValidatorStubsByPower(newVss)) + newVss = sortVValidatorStubsByPower(ctx, newVss) selfIndex = valIndexFn(0) proposal = types.NewProposal(vss[1].Height, round, -1, blockID) @@ -709,7 +711,7 @@ func tempWALWithData(data []byte) string { // Make some blocks. Start a fresh app and apply nBlocks blocks. // Then restart the app and sync it up with the remaining blocks func testHandshakeReplay( - ctx context.Context, + rctx context.Context, t *testing.T, sim *simulatorTestSuite, nBlocks int, @@ -721,9 +723,8 @@ func testHandshakeReplay( var store *mockBlockStore var stateDB dbm.DB var genesisState sm.State - var cancel context.CancelFunc - ctx, cancel = context.WithCancel(ctx) + ctx, cancel := context.WithCancel(rctx) t.Cleanup(cancel) cfg := sim.Config diff --git a/internal/consensus/state.go b/internal/consensus/state.go index e87d058eb..10d0bb975 100644 --- a/internal/consensus/state.go +++ b/internal/consensus/state.go @@ -131,7 +131,7 @@ type State struct { nSteps int // some functions can be overwritten for testing - decideProposal func(height int64, round int32) + decideProposal func(ctx context.Context, height int64, round int32) doPrevote func(ctx context.Context, height int64, round int32) setProposal func(proposal *types.Proposal) error @@ -272,7 +272,7 @@ func (cs *State) GetValidators() (int64, []*types.Validator) { // SetPrivValidator sets the private validator account for signing votes. It // immediately requests pubkey and caches it. -func (cs *State) SetPrivValidator(priv types.PrivValidator) { +func (cs *State) SetPrivValidator(ctx context.Context, priv types.PrivValidator) { cs.mtx.Lock() defer cs.mtx.Unlock() @@ -298,7 +298,7 @@ func (cs *State) SetPrivValidator(priv types.PrivValidator) { } } - if err := cs.updatePrivValidatorPubKey(); err != nil { + if err := cs.updatePrivValidatorPubKey(ctx); err != nil { cs.logger.Error("failed to get private validator pubkey", "err", err) } } @@ -1207,7 +1207,7 @@ func (cs *State) enterPropose(ctx context.Context, height int64, round int32) { "proposer", address, ) - cs.decideProposal(height, round) + cs.decideProposal(ctx, height, round) } else { logger.Debug( "propose step; not our turn to propose", @@ -1220,7 +1220,7 @@ func (cs *State) isProposer(address []byte) bool { return bytes.Equal(cs.Validators.GetProposer().Address, address) } -func (cs *State) defaultDecideProposal(height int64, round int32) { +func (cs *State) defaultDecideProposal(ctx context.Context, height int64, round int32) { var block *types.Block var blockParts *types.PartSet @@ -1248,9 +1248,9 @@ func (cs *State) defaultDecideProposal(height int64, round int32) { p := proposal.ToProto() // wait the max amount we would wait for a proposal - ctx, cancel := context.WithTimeout(context.TODO(), cs.config.TimeoutPropose) + ctxto, cancel := context.WithTimeout(ctx, cs.config.TimeoutPropose) defer cancel() - if err := cs.privValidator.SignProposal(ctx, cs.state.ChainID, p); err == nil { + if err := cs.privValidator.SignProposal(ctxto, cs.state.ChainID, p); err == nil { proposal.Signature = p.Signature // send proposal and block parts on internal msg queue @@ -1772,7 +1772,7 @@ func (cs *State) finalizeCommit(ctx context.Context, height int64) { fail.Fail() // XXX // Private validator might have changed it's key pair => refetch pubkey. - if err := cs.updatePrivValidatorPubKey(); err != nil { + if err := cs.updatePrivValidatorPubKey(ctx); err != nil { logger.Error("failed to get private validator pubkey", "err", err) } @@ -2250,6 +2250,7 @@ func (cs *State) addVote( // CONTRACT: cs.privValidator is not nil. func (cs *State) signVote( + ctx context.Context, msgType tmproto.SignedMsgType, hash []byte, header types.PartSetHeader, @@ -2292,10 +2293,10 @@ func (cs *State) signVote( timeout = time.Second } - ctx, cancel := context.WithTimeout(context.TODO(), timeout) + ctxto, cancel := context.WithTimeout(ctx, timeout) defer cancel() - err := cs.privValidator.SignVote(ctx, cs.state.ChainID, v) + err := cs.privValidator.SignVote(ctxto, cs.state.ChainID, v) vote.Signature = v.Signature vote.Timestamp = v.Timestamp @@ -2344,7 +2345,7 @@ func (cs *State) signAddVote(ctx context.Context, msgType tmproto.SignedMsgType, } // TODO: pass pubKey to signVote - vote, err := cs.signVote(msgType, hash, header) + vote, err := cs.signVote(ctx, msgType, hash, header) if err == nil { cs.sendInternalMessage(ctx, msgInfo{&VoteMessage{vote}, ""}) cs.logger.Debug("signed and pushed vote", "height", cs.Height, "round", cs.Round, "vote", vote) @@ -2358,7 +2359,7 @@ func (cs *State) signAddVote(ctx context.Context, msgType tmproto.SignedMsgType, // updatePrivValidatorPubKey get's the private validator public key and // memoizes it. This func returns an error if the private validator is not // responding or responds with an error. -func (cs *State) updatePrivValidatorPubKey() error { +func (cs *State) updatePrivValidatorPubKey(rctx context.Context) error { if cs.privValidator == nil { return nil } @@ -2377,9 +2378,9 @@ func (cs *State) updatePrivValidatorPubKey() error { // set context timeout depending on the configuration and the State step, // this helps in avoiding blocking of the remote signer connection. - ctx, cancel := context.WithTimeout(context.TODO(), timeout) + ctxto, cancel := context.WithTimeout(rctx, timeout) defer cancel() - pubKey, err := cs.privValidator.GetPubKey(ctx) + pubKey, err := cs.privValidator.GetPubKey(ctxto) if err != nil { return err } diff --git a/internal/consensus/state_test.go b/internal/consensus/state_test.go index 92caca7a3..fd79c19a6 100644 --- a/internal/consensus/state_test.go +++ b/internal/consensus/state_test.go @@ -160,7 +160,7 @@ func TestStateEnterProposeNoPrivValidator(t *testing.T) { cs, _, err := randState(ctx, config, log.TestingLogger(), 1) require.NoError(t, err) - cs.SetPrivValidator(nil) + cs.SetPrivValidator(ctx, nil) height, round := cs.Height, cs.Round // Listen for propose timeout event diff --git a/internal/consensus/wal_generator.go b/internal/consensus/wal_generator.go index d1130defb..5012da5b1 100644 --- a/internal/consensus/wal_generator.go +++ b/internal/consensus/wal_generator.go @@ -83,7 +83,7 @@ func WALGenerateNBlocks(ctx context.Context, t *testing.T, wr io.Writer, numBloc consensusState := NewState(ctx, logger, cfg.Consensus, state.Copy(), blockExec, blockStore, mempool, evpool) consensusState.SetEventBus(eventBus) if privValidator != nil && privValidator != (*privval.FilePV)(nil) { - consensusState.SetPrivValidator(privValidator) + consensusState.SetPrivValidator(ctx, privValidator) } // END OF COPY PASTE diff --git a/internal/mempool/mempool.go b/internal/mempool/mempool.go index 82aa3d7c7..e0cb72d0b 100644 --- a/internal/mempool/mempool.go +++ b/internal/mempool/mempool.go @@ -233,10 +233,6 @@ func (txmp *TxMempool) CheckTx( cb func(*abci.Response), txInfo TxInfo, ) error { - if ctx == nil { - ctx = context.TODO() - } - txmp.mtx.RLock() defer txmp.mtx.RUnlock() diff --git a/internal/state/execution.go b/internal/state/execution.go index 627a6770c..e50b3e97f 100644 --- a/internal/state/execution.go +++ b/internal/state/execution.go @@ -162,7 +162,7 @@ func (blockExec *BlockExecutor) ApplyBlock( } startTime := time.Now().UnixNano() - abciResponses, err := execBlockOnProxyApp( + abciResponses, err := execBlockOnProxyApp(ctx, blockExec.logger, blockExec.proxyApp, block, blockExec.store, state.InitialHeight, ) endTime := time.Now().UnixNano() @@ -297,6 +297,7 @@ func (blockExec *BlockExecutor) Commit( // Executes block's transactions on proxyAppConn. // Returns a list of transaction results and updates to the validator set func execBlockOnProxyApp( + ctx context.Context, logger log.Logger, proxyAppConn proxy.AppConnConsensus, block *types.Block, @@ -337,8 +338,6 @@ func execBlockOnProxyApp( byzVals = append(byzVals, evidence.ABCI()...) } - ctx := context.TODO() - // Begin block var err error pbh := block.Header.ToProto() @@ -584,7 +583,7 @@ func ExecCommitBlock( initialHeight int64, s State, ) ([]byte, error) { - abciResponses, err := execBlockOnProxyApp(logger, appConnConsensus, block, store, initialHeight) + abciResponses, err := execBlockOnProxyApp(ctx, logger, appConnConsensus, block, store, initialHeight) if err != nil { logger.Error("failed executing block on proxy app", "height", block.Height, "err", err) return nil, err diff --git a/internal/state/indexer/indexer_service.go b/internal/state/indexer/indexer_service.go index f1bf763b2..e73e4a3ba 100644 --- a/internal/state/indexer/indexer_service.go +++ b/internal/state/indexer/indexer_service.go @@ -122,7 +122,7 @@ func (is *Service) OnStart(ctx context.Context) error { // If the event sinks support indexing, register an observer to capture // block header data for the indexer. if IndexingEnabled(is.eventSinks) { - err := is.eventBus.Observe(context.TODO(), is.publish, + err := is.eventBus.Observe(ctx, is.publish, types.EventQueryNewBlockHeader, types.EventQueryTx) if err != nil { return err diff --git a/internal/state/indexer/sink/null/null_test.go b/internal/state/indexer/sink/null/null_test.go index 15b77dc55..0cb5a552c 100644 --- a/internal/state/indexer/sink/null/null_test.go +++ b/internal/state/indexer/sink/null/null_test.go @@ -10,14 +10,17 @@ import ( ) func TestNullEventSink(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + nullIndexer := NewEventSink() assert.Nil(t, nullIndexer.IndexTxEvents(nil)) assert.Nil(t, nullIndexer.IndexBlockEvents(types.EventDataNewBlockHeader{})) - val1, err1 := nullIndexer.SearchBlockEvents(context.TODO(), nil) + val1, err1 := nullIndexer.SearchBlockEvents(ctx, nil) assert.Nil(t, val1) assert.Nil(t, err1) - val2, err2 := nullIndexer.SearchTxEvents(context.TODO(), nil) + val2, err2 := nullIndexer.SearchTxEvents(ctx, nil) assert.Nil(t, val2) assert.Nil(t, err2) val3, err3 := nullIndexer.GetTxByHash(nil) diff --git a/internal/statesync/reactor.go b/internal/statesync/reactor.go index b161225a8..8f39f54bc 100644 --- a/internal/statesync/reactor.go +++ b/internal/statesync/reactor.go @@ -508,7 +508,7 @@ func (r *Reactor) handleSnapshotMessage(ctx context.Context, envelope *p2p.Envel switch msg := envelope.Message.(type) { case *ssproto.SnapshotsRequest: - snapshots, err := r.recentSnapshots(recentSnapshots) + snapshots, err := r.recentSnapshots(ctx, recentSnapshots) if err != nil { logger.Error("failed to fetch snapshots", "err", err) return nil @@ -585,7 +585,7 @@ func (r *Reactor) handleChunkMessage(ctx context.Context, envelope *p2p.Envelope "chunk", msg.Index, "peer", envelope.From, ) - resp, err := r.conn.LoadSnapshotChunkSync(context.TODO(), abci.RequestLoadSnapshotChunk{ + resp, err := r.conn.LoadSnapshotChunkSync(ctx, abci.RequestLoadSnapshotChunk{ Height: msg.Height, Format: msg.Format, Chunk: msg.Index, @@ -875,8 +875,8 @@ func (r *Reactor) processPeerUpdates(ctx context.Context) { } // recentSnapshots fetches the n most recent snapshots from the app -func (r *Reactor) recentSnapshots(n uint32) ([]*snapshot, error) { - resp, err := r.conn.ListSnapshotsSync(context.TODO(), abci.RequestListSnapshots{}) +func (r *Reactor) recentSnapshots(ctx context.Context, n uint32) ([]*snapshot, error) { + resp, err := r.conn.ListSnapshotsSync(ctx, abci.RequestListSnapshots{}) if err != nil { return nil, err } diff --git a/internal/statesync/syncer.go b/internal/statesync/syncer.go index d998c89e2..de80fe1fa 100644 --- a/internal/statesync/syncer.go +++ b/internal/statesync/syncer.go @@ -348,10 +348,12 @@ func (s *syncer) Sync(ctx context.Context, snapshot *snapshot, chunks *chunkQueu return sm.State{}, nil, err } - // Verify app and app version - if err := s.verifyApp(snapshot, state.Version.Consensus.App); err != nil { + // Verify app and update app version + appVersion, err := s.verifyApp(ctx, snapshot) + if err != nil { return sm.State{}, nil, err } + state.Version.Consensus.App = appVersion // Done! 🎉 s.logger.Info("Snapshot restored", "height", snapshot.Height, "format", snapshot.Format, @@ -545,27 +547,19 @@ func (s *syncer) requestChunk(ctx context.Context, snapshot *snapshot, chunk uin return nil } -// verifyApp verifies the sync, checking the app hash, last block height and app version -func (s *syncer) verifyApp(snapshot *snapshot, appVersion uint64) error { - resp, err := s.connQuery.InfoSync(context.TODO(), proxy.RequestInfo) +// verifyApp verifies the sync, checking the app hash and last block height. It returns the +// app version, which should be returned as part of the initial state. +func (s *syncer) verifyApp(ctx context.Context, snapshot *snapshot) (uint64, error) { + resp, err := s.connQuery.InfoSync(ctx, proxy.RequestInfo) if err != nil { - return fmt.Errorf("failed to query ABCI app for appHash: %w", err) - } - - // sanity check that the app version in the block matches the application's own record - // of its version - if resp.AppVersion != appVersion { - // An error here most like means that the app hasn't inplemented state sync - // or the Info call correctly - return fmt.Errorf("app version mismatch. Expected: %d, got: %d", - appVersion, resp.AppVersion) + return 0, fmt.Errorf("failed to query ABCI app for appHash: %w", err) } if !bytes.Equal(snapshot.trustedAppHash, resp.LastBlockAppHash) { s.logger.Error("appHash verification failed", "expected", snapshot.trustedAppHash, "actual", resp.LastBlockAppHash) - return errVerifyFailed + return 0, errVerifyFailed } if uint64(resp.LastBlockHeight) != snapshot.Height { @@ -574,9 +568,9 @@ func (s *syncer) verifyApp(snapshot *snapshot, appVersion uint64) error { "expected", snapshot.Height, "actual", resp.LastBlockHeight, ) - return errVerifyFailed + return 0, errVerifyFailed } s.logger.Info("Verified ABCI app", "height", snapshot.Height, "appHash", snapshot.trustedAppHash) - return nil + return resp.AppVersion, nil } diff --git a/internal/statesync/syncer_test.go b/internal/statesync/syncer_test.go index ffcfa4790..3126a7688 100644 --- a/internal/statesync/syncer_test.go +++ b/internal/statesync/syncer_test.go @@ -30,7 +30,7 @@ func TestSyncer_SyncAny(t *testing.T) { Version: sm.Version{ Consensus: version.Consensus{ Block: version.BlockProtocol, - App: testAppVersion, + App: 0, }, Software: version.TMVersion, }, @@ -178,7 +178,7 @@ func TestSyncer_SyncAny(t *testing.T) { Index: 2, Chunk: []byte{1, 1, 2}, }).Once().Return(&abci.ResponseApplySnapshotChunk{Result: abci.ResponseApplySnapshotChunk_ACCEPT}, nil) connQuery.On("InfoSync", mock.Anything, proxy.RequestInfo).Return(&abci.ResponseInfo{ - AppVersion: testAppVersion, + AppVersion: 9, LastBlockHeight: 1, LastBlockAppHash: []byte("app_hash"), }, nil) @@ -192,7 +192,10 @@ func TestSyncer_SyncAny(t *testing.T) { require.Equal(t, map[uint32]int{0: 1, 1: 2, 2: 1}, chunkRequests) chunkRequestsMtx.Unlock() + // The syncer should have updated the state app version from the ABCI info response. expectState := state + expectState.Version.Consensus.App = 9 + require.Equal(t, expectState, newState) require.Equal(t, commit, lastCommit) @@ -710,8 +713,6 @@ func TestSyncer_applyChunks_RejectSenders(t *testing.T) { func TestSyncer_verifyApp(t *testing.T) { boom := errors.New("boom") - appVersion := uint64(9) - appVersionMismatchErr := errors.New("app version mismatch. Expected: 9, got: 2") s := &snapshot{Height: 3, Format: 1, Chunks: 5, Hash: []byte{1, 2, 3}, trustedAppHash: []byte("app_hash")} testcases := map[string]struct { @@ -722,22 +723,17 @@ func TestSyncer_verifyApp(t *testing.T) { "verified": {&abci.ResponseInfo{ LastBlockHeight: 3, LastBlockAppHash: []byte("app_hash"), - AppVersion: appVersion, + AppVersion: 9, }, nil, nil}, - "invalid app version": {&abci.ResponseInfo{ - LastBlockHeight: 3, - LastBlockAppHash: []byte("app_hash"), - AppVersion: 2, - }, nil, appVersionMismatchErr}, "invalid height": {&abci.ResponseInfo{ LastBlockHeight: 5, LastBlockAppHash: []byte("app_hash"), - AppVersion: appVersion, + AppVersion: 9, }, nil, errVerifyFailed}, "invalid hash": {&abci.ResponseInfo{ LastBlockHeight: 3, LastBlockAppHash: []byte("xxx"), - AppVersion: appVersion, + AppVersion: 9, }, nil, errVerifyFailed}, "error": {nil, boom, boom}, } @@ -753,12 +749,16 @@ func TestSyncer_verifyApp(t *testing.T) { rts := setup(ctx, t, nil, nil, nil, 2) rts.connQuery.On("InfoSync", mock.Anything, proxy.RequestInfo).Return(tc.response, tc.err) - err := rts.syncer.verifyApp(s, appVersion) + version, err := rts.syncer.verifyApp(ctx, s) unwrapped := errors.Unwrap(err) if unwrapped != nil { err = unwrapped } + require.Equal(t, tc.expectErr, err) + if err == nil { + require.Equal(t, tc.response.AppVersion, version) + } }) } } diff --git a/node/node.go b/node/node.go index 5de408945..3b0e3668b 100644 --- a/node/node.go +++ b/node/node.go @@ -1034,6 +1034,7 @@ func createAndStartPrivValidatorGRPCClient( logger log.Logger, ) (types.PrivValidator, error) { pvsc, err := tmgrpc.DialRemoteSigner( + ctx, cfg.PrivValidator, chainID, logger, diff --git a/node/setup.go b/node/setup.go index 5e626a117..6408b9456 100644 --- a/node/setup.go +++ b/node/setup.go @@ -310,7 +310,7 @@ func createConsensusReactor( ) if privValidator != nil && cfg.Mode == config.ModeValidator { - consensusState.SetPrivValidator(privValidator) + consensusState.SetPrivValidator(ctx, privValidator) } csChDesc := consensus.GetChannelDescriptors() diff --git a/privval/grpc/util.go b/privval/grpc/util.go index a73fd65b1..0361139da 100644 --- a/privval/grpc/util.go +++ b/privval/grpc/util.go @@ -88,6 +88,7 @@ func GenerateTLS(certPath, keyPath, ca string, log log.Logger) grpc.DialOption { // DialRemoteSigner is a generalized function to dial the gRPC server. func DialRemoteSigner( + ctx context.Context, cfg *config.PrivValidatorConfig, chainID string, logger log.Logger, @@ -110,7 +111,6 @@ func DialRemoteSigner( dialOptions = append(dialOptions, transportSecurity) - ctx := context.TODO() _, address := tmnet.ProtocolAndAddress(cfg.ListenAddr) conn, err := grpc.DialContext(ctx, address, dialOptions...) if err != nil { diff --git a/privval/signer_server.go b/privval/signer_server.go index e1235d5f3..e98d78b75 100644 --- a/privval/signer_server.go +++ b/privval/signer_server.go @@ -60,7 +60,7 @@ func (ss *SignerServer) SetRequestHandler(validationRequestHandler ValidationReq ss.validationRequestHandler = validationRequestHandler } -func (ss *SignerServer) servicePendingRequest() { +func (ss *SignerServer) servicePendingRequest(ctx context.Context) { if !ss.IsRunning() { return // Ignore error from closing. } @@ -78,7 +78,7 @@ func (ss *SignerServer) servicePendingRequest() { // limit the scope of the lock ss.handlerMtx.Lock() defer ss.handlerMtx.Unlock() - res, err = ss.validationRequestHandler(context.TODO(), ss.privVal, req, ss.chainID) // todo + res, err = ss.validationRequestHandler(ctx, ss.privVal, req, ss.chainID) // todo if err != nil { // only log the error; we'll reply with an error in res ss.endpoint.logger.Error("SignerServer: handleMessage", "err", err) @@ -100,7 +100,7 @@ func (ss *SignerServer) serviceLoop(ctx context.Context) { if err := ss.endpoint.ensureConnection(); err != nil { return } - ss.servicePendingRequest() + ss.servicePendingRequest(ctx) } } } diff --git a/types/evidence_test.go b/types/evidence_test.go index 5110bcb1d..06c118d61 100644 --- a/types/evidence_test.go +++ b/types/evidence_test.go @@ -317,6 +317,9 @@ func TestEvidenceProto(t *testing.T) { } func TestEvidenceVectors(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + // Votes for duplicateEvidence val := NewMockPV() val.PrivKey = ed25519.GenPrivKeyFromSecret([]byte("it's a secret")) // deterministic key @@ -330,7 +333,7 @@ func TestEvidenceVectors(t *testing.T) { height := int64(5) commonHeight := height - 1 nValidators := 10 - voteSet, valSet, privVals := deterministicVoteSet(height, 1, tmproto.PrecommitType, 1) + voteSet, valSet, privVals := deterministicVoteSet(ctx, height, 1, tmproto.PrecommitType, 1) header := &Header{ Version: version.Consensus{Block: 1, App: 1}, ChainID: chainID, diff --git a/types/validator_set_test.go b/types/validator_set_test.go index 87008bb1c..dd616e0b8 100644 --- a/types/validator_set_test.go +++ b/types/validator_set_test.go @@ -1599,7 +1599,7 @@ func BenchmarkValidatorSet_VerifyCommitLightTrusting_Ed25519(b *testing.B) { // where each validator has a power of 50 // // EXPOSED FOR TESTING. -func deterministicValidatorSet() (*ValidatorSet, []PrivValidator) { +func deterministicValidatorSet(ctx context.Context) (*ValidatorSet, []PrivValidator) { var ( valz = make([]*Validator, 10) privValidators = make([]PrivValidator, 10) @@ -1607,7 +1607,7 @@ func deterministicValidatorSet() (*ValidatorSet, []PrivValidator) { for i := 0; i < 10; i++ { // val, privValidator := DeterministicValidator(ed25519.PrivKey([]byte(deterministicKeys[i]))) - val, privValidator := deterministicValidator(ed25519.GenPrivKeyFromSecret([]byte(fmt.Sprintf("key: %x", i)))) + val, privValidator := deterministicValidator(ctx, ed25519.GenPrivKeyFromSecret([]byte(fmt.Sprintf("key: %x", i)))) valz[i] = val privValidators[i] = privValidator } diff --git a/types/validator_test.go b/types/validator_test.go index 872dee820..06ebc5686 100644 --- a/types/validator_test.go +++ b/types/validator_test.go @@ -105,11 +105,11 @@ func TestValidatorValidateBasic(t *testing.T) { // deterministicValidator returns a deterministic validator, useful for testing. // UNSTABLE -func deterministicValidator(key crypto.PrivKey) (*Validator, PrivValidator) { +func deterministicValidator(ctx context.Context, key crypto.PrivKey) (*Validator, PrivValidator) { privVal := NewMockPV() privVal.PrivKey = key var votePower int64 = 50 - pubKey, err := privVal.GetPubKey(context.TODO()) + pubKey, err := privVal.GetPubKey(ctx) if err != nil { panic(fmt.Errorf("could not retrieve pubkey %w", err)) } diff --git a/types/vote_set_test.go b/types/vote_set_test.go index 542665e2d..e6d4605b8 100644 --- a/types/vote_set_test.go +++ b/types/vote_set_test.go @@ -489,12 +489,13 @@ func randVoteSet( } func deterministicVoteSet( + ctx context.Context, height int64, round int32, signedMsgType tmproto.SignedMsgType, votingPower int64, ) (*VoteSet, *ValidatorSet, []PrivValidator) { - valSet, privValidators := deterministicValidatorSet() + valSet, privValidators := deterministicValidatorSet(ctx) return NewVoteSet("test_chain_id", height, round, signedMsgType, valSet), valSet, privValidators }