diff --git a/internal/blocksync/pool_test.go b/internal/blocksync/pool_test.go index 0306a31c0..1cb8cca40 100644 --- a/internal/blocksync/pool_test.go +++ b/internal/blocksync/pool_test.go @@ -86,7 +86,7 @@ func TestBlockPoolBasic(t *testing.T) { peers := makePeers(10, start+1, 1000) errorsCh := make(chan peerError, 1000) requestsCh := make(chan BlockRequest, 1000) - pool := NewBlockPool(log.TestingLogger(), start, requestsCh, errorsCh) + pool := NewBlockPool(log.NewNopLogger(), start, requestsCh, errorsCh) if err := pool.Start(ctx); err != nil { t.Error(err) @@ -138,7 +138,7 @@ func TestBlockPoolTimeout(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() - logger := log.TestingLogger() + logger := log.NewNopLogger() start := int64(42) peers := makePeers(10, start+1, 1000) @@ -207,7 +207,7 @@ func TestBlockPoolRemovePeer(t *testing.T) { requestsCh := make(chan BlockRequest) errorsCh := make(chan peerError) - pool := NewBlockPool(log.TestingLogger(), 1, requestsCh, errorsCh) + pool := NewBlockPool(log.NewNopLogger(), 1, requestsCh, errorsCh) err := pool.Start(ctx) require.NoError(t, err) t.Cleanup(func() { cancel(); pool.Wait() }) diff --git a/internal/blocksync/reactor.go b/internal/blocksync/reactor.go index 89c8c642b..b9c4e498c 100644 --- a/internal/blocksync/reactor.go +++ b/internal/blocksync/reactor.go @@ -80,15 +80,7 @@ type Reactor struct { blockSync *atomicBool blockSyncCh *p2p.Channel - // blockSyncOutBridgeCh defines a channel that acts as a bridge between sending Envelope - // messages that the reactor will consume in processBlockSyncCh and receiving messages - // from the peer updates channel and other goroutines. We do this instead of directly - // sending on blockSyncCh.Out to avoid race conditions in the case where other goroutines - // send Envelopes directly to the to blockSyncCh.Out channel, since processBlockSyncCh - // may close the blockSyncCh.Out channel at the same time that other goroutines send to - // blockSyncCh.Out. - blockSyncOutBridgeCh chan p2p.Envelope - peerUpdates *p2p.PeerUpdates + peerUpdates *p2p.PeerUpdates requestsCh <-chan BlockRequest errorsCh <-chan peerError @@ -119,17 +111,16 @@ func NewReactor( } r := &Reactor{ - logger: logger, - stateStore: stateStore, - blockExec: blockExec, - store: store, - consReactor: consReactor, - blockSync: newAtomicBool(blockSync), - blockSyncCh: blockSyncCh, - blockSyncOutBridgeCh: make(chan p2p.Envelope), - peerUpdates: peerUpdates, - metrics: metrics, - eventBus: eventBus, + logger: logger, + stateStore: stateStore, + blockExec: blockExec, + store: store, + consReactor: consReactor, + blockSync: newAtomicBool(blockSync), + blockSyncCh: blockSyncCh, + peerUpdates: peerUpdates, + metrics: metrics, + eventBus: eventBus, } r.BaseService = *service.NewBaseService(logger, "BlockSync", r) @@ -175,7 +166,6 @@ func (r *Reactor) OnStart(ctx context.Context) error { } go r.processBlockSyncCh(ctx) - go r.processBlockSyncBridge(ctx) go r.processPeerUpdates(ctx) return nil @@ -306,21 +296,8 @@ func (r *Reactor) processBlockSyncCh(ctx context.Context) { } } -func (r *Reactor) processBlockSyncBridge(ctx context.Context) { - for { - select { - case <-ctx.Done(): - return - case envelope := <-r.blockSyncOutBridgeCh: - if err := r.blockSyncCh.Send(ctx, envelope); err != nil { - return - } - } - } -} - // processPeerUpdate processes a PeerUpdate. -func (r *Reactor) processPeerUpdate(peerUpdate p2p.PeerUpdate) { +func (r *Reactor) processPeerUpdate(ctx context.Context, peerUpdate p2p.PeerUpdate) { r.logger.Debug("received peer update", "peer", peerUpdate.NodeID, "status", peerUpdate.Status) // XXX: Pool#RedoRequest can sometimes give us an empty peer. @@ -331,12 +308,20 @@ func (r *Reactor) processPeerUpdate(peerUpdate p2p.PeerUpdate) { switch peerUpdate.Status { case p2p.PeerStatusUp: // send a status update the newly added peer - r.blockSyncOutBridgeCh <- p2p.Envelope{ + if err := r.blockSyncCh.Send(ctx, p2p.Envelope{ To: peerUpdate.NodeID, Message: &bcproto.StatusResponse{ Base: r.store.Base(), Height: r.store.Height(), }, + }); err != nil { + r.pool.RemovePeer(peerUpdate.NodeID) + if err := r.blockSyncCh.SendError(ctx, p2p.PeerError{ + NodeID: peerUpdate.NodeID, + Err: err, + }); err != nil { + return + } } case p2p.PeerStatusDown: @@ -353,7 +338,7 @@ func (r *Reactor) processPeerUpdates(ctx context.Context) { case <-ctx.Done(): return case peerUpdate := <-r.peerUpdates.Updates(): - r.processPeerUpdate(peerUpdate) + r.processPeerUpdate(ctx, peerUpdate) } } } @@ -372,7 +357,6 @@ func (r *Reactor) SwitchToBlockSync(ctx context.Context, state sm.State) error { r.syncStartTime = time.Now() go r.requestRoutine(ctx) - go r.poolRoutine(ctx, true) return nil @@ -387,15 +371,17 @@ func (r *Reactor) requestRoutine(ctx context.Context) { case <-ctx.Done(): return case request := <-r.requestsCh: - select { - case <-ctx.Done(): - return - case r.blockSyncOutBridgeCh <- p2p.Envelope{ + if err := r.blockSyncCh.Send(ctx, p2p.Envelope{ To: request.PeerID, Message: &bcproto.BlockRequest{Height: request.Height}, - }: + }); err != nil { + if err := r.blockSyncCh.SendError(ctx, p2p.PeerError{ + NodeID: request.PeerID, + Err: err, + }); err != nil { + return + } } - case pErr := <-r.errorsCh: if err := r.blockSyncCh.SendError(ctx, p2p.PeerError{ NodeID: pErr.peerID, @@ -404,16 +390,12 @@ func (r *Reactor) requestRoutine(ctx context.Context) { return } case <-statusUpdateTicker.C: - go func() { - select { - case <-ctx.Done(): - return - case r.blockSyncOutBridgeCh <- p2p.Envelope{ - Broadcast: true, - Message: &bcproto.StatusRequest{}, - }: - } - }() + if err := r.blockSyncCh.Send(ctx, p2p.Envelope{ + Broadcast: true, + Message: &bcproto.StatusRequest{}, + }); err != nil { + return + } } } } diff --git a/internal/blocksync/reactor_test.go b/internal/blocksync/reactor_test.go index 14264e040..5c3c797ce 100644 --- a/internal/blocksync/reactor_test.go +++ b/internal/blocksync/reactor_test.go @@ -62,7 +62,7 @@ func setup( "must specify at least one block height (nodes)") rts := &reactorTestSuite{ - logger: log.TestingLogger().With("module", "block_sync", "testCase", t.Name()), + logger: log.NewNopLogger().With("module", "block_sync", "testCase", t.Name()), network: p2ptest.MakeNetwork(ctx, t, p2ptest.NetworkOptions{NumNodes: numNodes}), nodes: make([]types.NodeID, 0, numNodes), reactors: make(map[types.NodeID]*Reactor, numNodes), @@ -108,7 +108,7 @@ func (rts *reactorTestSuite) addNode( ) { t.Helper() - logger := log.TestingLogger() + logger := log.NewNopLogger() rts.nodes = append(rts.nodes, nodeID) rts.app[nodeID] = proxy.New(abciclient.NewLocalClient(logger, &abci.BaseApplication{}), logger, proxy.NopMetrics()) @@ -139,7 +139,7 @@ func (rts *reactorTestSuite) addNode( blockExec := sm.NewBlockExecutor( stateStore, - log.TestingLogger(), + log.NewNopLogger(), rts.app[nodeID], mp, sm.EmptyEvidencePool{}, diff --git a/internal/consensus/byzantine_test.go b/internal/consensus/byzantine_test.go index 221baf3e1..dfeb556fe 100644 --- a/internal/consensus/byzantine_test.go +++ b/internal/consensus/byzantine_test.go @@ -79,7 +79,7 @@ func TestByzantinePrevoteEquivocation(t *testing.T) { // Make Mempool mempool := mempool.NewTxMempool( - log.TestingLogger().With("module", "mempool"), + log.NewNopLogger().With("module", "mempool"), thisConfig.Mempool, proxyAppConnMem, ) @@ -87,7 +87,7 @@ func TestByzantinePrevoteEquivocation(t *testing.T) { mempool.EnableTxsAvailable() } - eventBus := eventbus.NewDefault(log.TestingLogger().With("module", "events")) + eventBus := eventbus.NewDefault(log.NewNopLogger().With("module", "events")) require.NoError(t, eventBus.Start(ctx)) // Make a full instance of the evidence pool @@ -95,7 +95,7 @@ func TestByzantinePrevoteEquivocation(t *testing.T) { evpool := evidence.NewPool(logger.With("module", "evidence"), evidenceDB, stateStore, blockStore, evidence.NopMetrics(), eventBus) // Make State - blockExec := sm.NewBlockExecutor(stateStore, log.TestingLogger(), proxyAppConnCon, mempool, evpool, blockStore, eventBus) + blockExec := sm.NewBlockExecutor(stateStore, log.NewNopLogger(), proxyAppConnCon, mempool, evpool, blockStore, eventBus) cs, err := NewState(ctx, logger, thisConfig.Consensus, stateStore, blockExec, blockStore, mempool, evpool, eventBus) require.NoError(t, err) // set private validator diff --git a/internal/consensus/common_test.go b/internal/consensus/common_test.go index b0f22e54f..161594021 100644 --- a/internal/consensus/common_test.go +++ b/internal/consensus/common_test.go @@ -777,7 +777,7 @@ func ensureMessageBeforeTimeout(t *testing.T, ch <-chan tmpubsub.Message, to tim // consensusLogger is a TestingLogger which uses a different // color for each validator ("validator" key must exist). func consensusLogger() log.Logger { - return log.TestingLogger().With("module", "consensus") + return log.NewNopLogger().With("module", "consensus") } func makeConsensusState( @@ -885,8 +885,11 @@ func randConsensusNetWithPeers( app := appFunc(logger, filepath.Join(cfg.DBDir(), fmt.Sprintf("%s_%d", testName, i))) vals := types.TM2PB.ValidatorUpdates(state.Validators) - if _, ok := app.(*kvstore.PersistentKVStoreApplication); ok { - // simulate handshake, receive app version. If don't do this, replay test will fail + switch app.(type) { + // simulate handshake, receive app version. If don't do this, replay test will fail + case *kvstore.PersistentKVStoreApplication: + state.Version.Consensus.App = kvstore.ProtocolVersion + case *kvstore.Application: state.Version.Consensus.App = kvstore.ProtocolVersion } app.InitChain(abci.RequestInitChain{Validators: vals}) @@ -973,10 +976,6 @@ func newEpehemeralKVStore(_ log.Logger, _ string) abci.Application { return kvstore.NewApplication() } -func newPersistentKVStore(logger log.Logger, dbDir string) abci.Application { - return kvstore.NewPersistentKVStoreApplication(logger, dbDir) -} - func signDataIsEqual(v1 *types.Vote, v2 *tmproto.Vote) bool { if v1 == nil || v2 == nil { return false diff --git a/internal/consensus/mempool_test.go b/internal/consensus/mempool_test.go index 3c1dc8064..9996631ab 100644 --- a/internal/consensus/mempool_test.go +++ b/internal/consensus/mempool_test.go @@ -43,7 +43,7 @@ func TestMempoolNoProgressUntilTxsAvailable(t *testing.T) { state, privVals := makeGenesisState(ctx, t, baseConfig, genesisStateArgs{ Validators: 1, Power: 10}) - cs := newStateWithConfig(ctx, t, log.TestingLogger(), config, state, privVals[0], NewCounterApplication()) + cs := newStateWithConfig(ctx, t, log.NewNopLogger(), config, state, privVals[0], NewCounterApplication()) assertMempool(t, cs.txNotifier).EnableTxsAvailable() height, round := cs.Height, cs.Round newBlockCh := subscribe(ctx, t, cs.eventBus, types.EventQueryNewBlock) @@ -70,7 +70,7 @@ func TestMempoolProgressAfterCreateEmptyBlocksInterval(t *testing.T) { state, privVals := makeGenesisState(ctx, t, baseConfig, genesisStateArgs{ Validators: 1, Power: 10}) - cs := newStateWithConfig(ctx, t, log.TestingLogger(), config, state, privVals[0], NewCounterApplication()) + cs := newStateWithConfig(ctx, t, log.NewNopLogger(), config, state, privVals[0], NewCounterApplication()) assertMempool(t, cs.txNotifier).EnableTxsAvailable() @@ -95,7 +95,7 @@ func TestMempoolProgressInHigherRound(t *testing.T) { state, privVals := makeGenesisState(ctx, t, baseConfig, genesisStateArgs{ Validators: 1, Power: 10}) - cs := newStateWithConfig(ctx, t, log.TestingLogger(), config, state, privVals[0], NewCounterApplication()) + cs := newStateWithConfig(ctx, t, log.NewNopLogger(), config, state, privVals[0], NewCounterApplication()) assertMempool(t, cs.txNotifier).EnableTxsAvailable() height, round := cs.Height, cs.Round newBlockCh := subscribe(ctx, t, cs.eventBus, types.EventQueryNewBlock) @@ -142,7 +142,7 @@ func TestMempoolTxConcurrentWithCommit(t *testing.T) { defer cancel() config := configSetup(t) - logger := log.TestingLogger() + logger := log.NewNopLogger() state, privVals := makeGenesisState(ctx, t, config, genesisStateArgs{ Validators: 1, Power: 10}) @@ -184,7 +184,7 @@ func TestMempoolRmBadTx(t *testing.T) { app := NewCounterApplication() stateStore := sm.NewStore(dbm.NewMemDB()) blockStore := store.NewBlockStore(dbm.NewMemDB()) - cs := newStateWithConfigAndBlockStore(ctx, t, log.TestingLogger(), config, state, privVals[0], app, blockStore) + cs := newStateWithConfigAndBlockStore(ctx, t, log.NewNopLogger(), config, state, privVals[0], app, blockStore) err := stateStore.Save(state) require.NoError(t, err) diff --git a/internal/consensus/pbts_test.go b/internal/consensus/pbts_test.go index 1cb6892c4..c5a5ac535 100644 --- a/internal/consensus/pbts_test.go +++ b/internal/consensus/pbts_test.go @@ -114,7 +114,7 @@ func newPBTSTestHarness(ctx context.Context, t *testing.T, tc pbtsTestConfigurat Time: tc.genesisTime, Validators: validators, }) - cs := newState(ctx, t, log.TestingLogger(), state, privVals[0], kvstore.NewApplication()) + cs := newState(ctx, t, log.NewNopLogger(), state, privVals[0], kvstore.NewApplication()) vss := make([]*validatorStub, validators) for i := 0; i < validators; i++ { vss[i] = newValidatorStub(privVals[i], int32(i)) diff --git a/internal/consensus/peer_state_test.go b/internal/consensus/peer_state_test.go index 06f49508a..2b5712455 100644 --- a/internal/consensus/peer_state_test.go +++ b/internal/consensus/peer_state_test.go @@ -10,7 +10,7 @@ import ( ) func peerStateSetup(h, r, v int) *PeerState { - ps := NewPeerState(log.TestingLogger(), "testPeerState") + ps := NewPeerState(log.NewNopLogger(), "testPeerState") ps.PRS.Height = int64(h) ps.PRS.Round = int32(r) ps.ensureVoteBitArrays(int64(h), v) diff --git a/internal/consensus/reactor.go b/internal/consensus/reactor.go index 822e8c627..c8d296ff9 100644 --- a/internal/consensus/reactor.go +++ b/internal/consensus/reactor.go @@ -119,6 +119,7 @@ type Reactor struct { mtx sync.RWMutex peers map[types.NodeID]*PeerState waitSync bool + rs *cstypes.RoundState readySignal chan struct{} // closed when the node is ready to start consensus stateCh *p2p.Channel @@ -166,6 +167,7 @@ func NewReactor( logger: logger, state: cs, waitSync: waitSync, + rs: cs.GetRoundState(), peers: make(map[types.NodeID]*PeerState), eventBus: eventBus, Metrics: metrics, @@ -199,6 +201,7 @@ func (r *Reactor) OnStart(ctx context.Context) error { go r.peerStatsRoutine(ctx) r.subscribeToBroadcastEvents() + go r.updateRoundStateRoutine() if !r.WaitSync() { if err := r.state.Start(ctx); err != nil { @@ -348,6 +351,8 @@ func (r *Reactor) broadcastHasVoteMessage(ctx context.Context, vote *types.Vote) // internal pubsub defined in the consensus state to broadcast them to peers // upon receiving. func (r *Reactor) subscribeToBroadcastEvents() { + onStopCh := r.state.getOnStopCh() + err := r.state.evsw.AddListenerForEvent( listenerIDConsensus, types.EventNewRoundStepValue, @@ -356,7 +361,7 @@ func (r *Reactor) subscribeToBroadcastEvents() { return err } select { - case r.state.onStopCh <- data.(*cstypes.RoundState): + case onStopCh <- data.(*cstypes.RoundState): return nil case <-ctx.Done(): return ctx.Err() @@ -405,10 +410,30 @@ func makeRoundStepMessage(rs *cstypes.RoundState) *tmcons.NewRoundStep { func (r *Reactor) sendNewRoundStepMessage(ctx context.Context, peerID types.NodeID) error { return r.stateCh.Send(ctx, p2p.Envelope{ To: peerID, - Message: makeRoundStepMessage(r.state.GetRoundState()), + Message: makeRoundStepMessage(r.getRoundState()), }) } +func (r *Reactor) updateRoundStateRoutine() { + t := time.NewTicker(100 * time.Microsecond) + defer t.Stop() + for range t.C { + if !r.IsRunning() { + return + } + rs := r.state.GetRoundState() + r.mtx.Lock() + r.rs = rs + r.mtx.Unlock() + } +} + +func (r *Reactor) getRoundState() *cstypes.RoundState { + r.mtx.RLock() + defer r.mtx.RUnlock() + return r.rs +} + func (r *Reactor) gossipDataForCatchup(ctx context.Context, rs *cstypes.RoundState, prs *cstypes.PeerRoundState, ps *PeerState) { logger := r.logger.With("height", prs.Height).With("peer", ps.peerID) @@ -491,7 +516,7 @@ OUTER_LOOP: default: } - rs := r.state.GetRoundState() + rs := r.getRoundState() prs := ps.GetRoundState() // Send proposal Block parts? @@ -749,7 +774,7 @@ func (r *Reactor) gossipVotesRoutine(ctx context.Context, ps *PeerState) { default: } - rs := r.state.GetRoundState() + rs := r.getRoundState() prs := ps.GetRoundState() switch logThrottle { @@ -840,7 +865,7 @@ func (r *Reactor) queryMaj23Routine(ctx context.Context, ps *PeerState) { return } - rs := r.state.GetRoundState() + rs := r.getRoundState() prs := ps.GetRoundState() // TODO create more reliable coppies of these // structures so the following go routines don't race diff --git a/internal/consensus/reactor_test.go b/internal/consensus/reactor_test.go index ef816d85f..b8d638976 100644 --- a/internal/consensus/reactor_test.go +++ b/internal/consensus/reactor_test.go @@ -478,7 +478,7 @@ func TestReactorWithEvidence(t *testing.T) { proxyAppConnCon := abciclient.NewLocalClient(logger, app) mempool := mempool.NewTxMempool( - log.TestingLogger().With("module", "mempool"), + log.NewNopLogger().With("module", "mempool"), thisConfig.Mempool, proxyAppConnMem, ) @@ -501,10 +501,10 @@ func TestReactorWithEvidence(t *testing.T) { evpool2 := sm.EmptyEvidencePool{} - eventBus := eventbus.NewDefault(log.TestingLogger().With("module", "events")) + eventBus := eventbus.NewDefault(log.NewNopLogger().With("module", "events")) require.NoError(t, eventBus.Start(ctx)) - blockExec := sm.NewBlockExecutor(stateStore, log.TestingLogger(), proxyAppConnCon, mempool, evpool, blockStore, eventBus) + blockExec := sm.NewBlockExecutor(stateStore, log.NewNopLogger(), proxyAppConnCon, mempool, evpool, blockStore, eventBus) cs, err := NewState(ctx, logger.With("validator", i, "module", "consensus"), thisConfig.Consensus, stateStore, blockExec, blockStore, mempool, evpool2, eventBus) diff --git a/internal/consensus/replay_test.go b/internal/consensus/replay_test.go index c735e9977..bb3123af2 100644 --- a/internal/consensus/replay_test.go +++ b/internal/consensus/replay_test.go @@ -8,7 +8,6 @@ import ( "io" "math/rand" "os" - "path/filepath" "runtime" "testing" "time" @@ -56,7 +55,7 @@ import ( func startNewStateAndWaitForBlock(ctx context.Context, t *testing.T, consensusReplayConfig *config.Config, lastBlockHeight int64, blockDB dbm.DB, stateStore sm.Store) { - logger := log.TestingLogger() + logger := log.NewNopLogger() state, err := sm.MakeGenesisStateFromFile(consensusReplayConfig.GenesisFile()) require.NoError(t, err) privValidator := loadPrivValidator(t, consensusReplayConfig) @@ -338,13 +337,13 @@ func setupSimulator(ctx context.Context, t *testing.T) *simulatorTestSuite { nPeers, "replay_test", newMockTickerFunc(true), - newPersistentKVStore) + newEpehemeralKVStore) sim.Config = cfg + defer func() { t.Cleanup(cleanup) }() var err error sim.GenesisState, err = sm.MakeGenesisState(genDoc) require.NoError(t, err) - sim.CleanupFunc = cleanup partSize := types.BlockPartSizeBytes @@ -584,9 +583,6 @@ func setupSimulator(ctx context.Context, t *testing.T) *simulatorTestSuite { sim.Chain = append(sim.Chain, css[0].blockStore.LoadBlock(int64(i))) sim.Commits = append(sim.Commits, css[0].blockStore.LoadBlockCommit(int64(i))) } - if sim.CleanupFunc != nil { - t.Cleanup(sim.CleanupFunc) - } return sim } @@ -686,7 +682,7 @@ func testHandshakeReplay( cfg := sim.Config - logger := log.TestingLogger() + logger := log.NewNopLogger() if testValidatorsChange { testConfig, err := ResetConfig(t.TempDir(), fmt.Sprintf("%s_%v_m", t.Name(), mode)) require.NoError(t, err) @@ -743,19 +739,14 @@ func testHandshakeReplay( ) latestAppHash := state.AppHash - // make a new client creator - kvstoreApp := kvstore.NewPersistentKVStoreApplication(logger, - filepath.Join(cfg.DBDir(), fmt.Sprintf("replay_test_%d_%d_a_r%d", nBlocks, mode, rand.Int()))) - t.Cleanup(func() { require.NoError(t, kvstoreApp.Close()) }) - eventBus := eventbus.NewDefault(logger) require.NoError(t, eventBus.Start(ctx)) - clientCreator2 := abciclient.NewLocalClient(logger, kvstoreApp) + client := abciclient.NewLocalClient(logger, kvstore.NewApplication()) if nBlocks > 0 { // run nBlocks against a new client to build up the app state. // use a throwaway tendermint state - proxyApp := proxy.New(clientCreator2, logger, proxy.NopMetrics()) + proxyApp := proxy.New(client, logger, proxy.NopMetrics()) stateDB1 := dbm.NewMemDB() stateStore := sm.NewStore(stateDB1) err := stateStore.Save(genesisState) @@ -776,7 +767,7 @@ func testHandshakeReplay( genDoc, err := sm.MakeGenesisDocFromFile(cfg.GenesisFile()) require.NoError(t, err) handshaker := NewHandshaker(logger, stateStore, state, store, eventBus, genDoc) - proxyApp := proxy.New(clientCreator2, logger, proxy.NopMetrics()) + proxyApp := proxy.New(client, logger, proxy.NopMetrics()) require.NoError(t, proxyApp.Start(ctx), "Error starting proxy app connections") require.True(t, proxyApp.IsRunning()) require.NotNil(t, proxyApp) @@ -828,7 +819,7 @@ func applyBlock( eventBus *eventbus.EventBus, ) sm.State { testPartSize := types.BlockPartSizeBytes - blockExec := sm.NewBlockExecutor(stateStore, log.TestingLogger(), appClient, mempool, evpool, blockStore, eventBus) + blockExec := sm.NewBlockExecutor(stateStore, log.NewNopLogger(), appClient, mempool, evpool, blockStore, eventBus) bps, err := blk.MakePartSet(testPartSize) require.NoError(t, err) @@ -905,10 +896,7 @@ func buildTMStateFromChain( t.Helper() // run the whole chain against this client to build up the tendermint state - kvstoreApp := kvstore.NewPersistentKVStoreApplication(logger, - filepath.Join(cfg.DBDir(), fmt.Sprintf("replay_test_%d_%d_t", nBlocks, mode))) - defer kvstoreApp.Close() - client := abciclient.NewLocalClient(logger, kvstoreApp) + client := abciclient.NewLocalClient(logger, kvstore.NewApplication()) proxyApp := proxy.New(client, logger, proxy.NopMetrics()) require.NoError(t, proxyApp.Start(ctx)) @@ -976,7 +964,7 @@ func TestHandshakePanicsIfAppReturnsWrongAppHash(t *testing.T) { store.chain = blocks - logger := log.TestingLogger() + logger := log.NewNopLogger() eventBus := eventbus.NewDefault(logger) require.NoError(t, eventBus.Start(ctx)) diff --git a/internal/consensus/state.go b/internal/consensus/state.go index 00519a884..bd79f4f83 100644 --- a/internal/consensus/state.go +++ b/internal/consensus/state.go @@ -452,10 +452,6 @@ func (cs *State) OnStart(ctx context.Context) error { } } - if err := cs.evsw.Start(ctx); err != nil { - return err - } - // Double Signing Risk Reduction if err := cs.checkDoubleSigningRisk(cs.Height); err != nil { return err @@ -497,23 +493,24 @@ func (cs *State) loadWalFile(ctx context.Context) error { return nil } +func (cs *State) getOnStopCh() chan *cstypes.RoundState { + cs.mtx.RLock() + defer cs.mtx.RUnlock() + + return cs.onStopCh +} + // OnStop implements service.Service. func (cs *State) OnStop() { // If the node is committing a new block, wait until it is finished! if cs.GetRoundState().Step == cstypes.RoundStepCommit { select { - case <-cs.onStopCh: + case <-cs.getOnStopCh(): case <-time.After(cs.config.TimeoutCommit): cs.logger.Error("OnStop: timeout waiting for commit to finish", "time", cs.config.TimeoutCommit) } } - close(cs.onStopCh) - - if cs.evsw.IsRunning() { - cs.evsw.Stop() - } - if cs.timeoutTicker.IsRunning() { cs.timeoutTicker.Stop() } @@ -937,7 +934,6 @@ func (cs *State) receiveRoutine(ctx context.Context, maxSteps int) { func (cs *State) handleMsg(ctx context.Context, mi msgInfo) { cs.mtx.Lock() defer cs.mtx.Unlock() - var ( added bool err error @@ -954,6 +950,24 @@ func (cs *State) handleMsg(ctx context.Context, mi msgInfo) { case *BlockPartMessage: // if the proposal is complete, we'll enterPrevote or tryFinalizeCommit added, err = cs.addProposalBlockPart(ctx, msg, peerID) + + // We unlock here to yield to any routines that need to read the the RoundState. + // Previously, this code held the lock from the point at which the final block + // part was received until the block executed against the application. + // This prevented the reactor from being able to retrieve the most updated + // version of the RoundState. The reactor needs the updated RoundState to + // gossip the now completed block. + // + // This code can be further improved by either always operating on a copy + // of RoundState and only locking when switching out State's copy of + // RoundState with the updated copy or by emitting RoundState events in + // more places for routines depending on it to listen for. + cs.mtx.Unlock() + + cs.mtx.Lock() + if added && cs.ProposalBlockParts.IsComplete() { + cs.handleCompleteProposal(ctx, msg.Height) + } if added { select { case cs.statsMsgQueue <- mi: @@ -2156,44 +2170,43 @@ func (cs *State) addProposalBlockPart( if err := cs.eventBus.PublishEventCompleteProposal(ctx, cs.CompleteProposalEvent()); err != nil { cs.logger.Error("failed publishing event complete proposal", "err", err) } + } - // Update Valid* if we can. - prevotes := cs.Votes.Prevotes(cs.Round) - blockID, hasTwoThirds := prevotes.TwoThirdsMajority() - if hasTwoThirds && !blockID.IsNil() && (cs.ValidRound < cs.Round) { - if cs.ProposalBlock.HashesTo(blockID.Hash) { - cs.logger.Debug( - "updating valid block to new proposal block", - "valid_round", cs.Round, - "valid_block_hash", cs.ProposalBlock.Hash(), - ) + return added, nil +} +func (cs *State) handleCompleteProposal(ctx context.Context, height int64) { + // Update Valid* if we can. + prevotes := cs.Votes.Prevotes(cs.Round) + blockID, hasTwoThirds := prevotes.TwoThirdsMajority() + if hasTwoThirds && !blockID.IsNil() && (cs.ValidRound < cs.Round) { + if cs.ProposalBlock.HashesTo(blockID.Hash) { + cs.logger.Debug( + "updating valid block to new proposal block", + "valid_round", cs.Round, + "valid_block_hash", cs.ProposalBlock.Hash(), + ) - cs.ValidRound = cs.Round - cs.ValidBlock = cs.ProposalBlock - cs.ValidBlockParts = cs.ProposalBlockParts - } - // TODO: In case there is +2/3 majority in Prevotes set for some - // block and cs.ProposalBlock contains different block, either - // proposer is faulty or voting power of faulty processes is more - // than 1/3. We should trigger in the future accountability - // procedure at this point. + cs.ValidRound = cs.Round + cs.ValidBlock = cs.ProposalBlock + cs.ValidBlockParts = cs.ProposalBlockParts } + // TODO: In case there is +2/3 majority in Prevotes set for some + // block and cs.ProposalBlock contains different block, either + // proposer is faulty or voting power of faulty processes is more + // than 1/3. We should trigger in the future accountability + // procedure at this point. + } - if cs.Step <= cstypes.RoundStepPropose && cs.isProposalComplete() { - // Move onto the next step - cs.enterPrevote(ctx, height, cs.Round) - if hasTwoThirds { // this is optimisation as this will be triggered when prevote is added - cs.enterPrecommit(ctx, height, cs.Round) - } - } else if cs.Step == cstypes.RoundStepCommit { - // If we're waiting on the proposal block... - cs.tryFinalizeCommit(ctx, height) + if cs.Step <= cstypes.RoundStepPropose && cs.isProposalComplete() { + // Move onto the next step + cs.enterPrevote(ctx, height, cs.Round) + if hasTwoThirds { // this is optimisation as this will be triggered when prevote is added + cs.enterPrecommit(ctx, height, cs.Round) } - - return added, nil + } else if cs.Step == cstypes.RoundStepCommit { + // If we're waiting on the proposal block... + cs.tryFinalizeCommit(ctx, height) } - - return added, nil } // Attempt to add the vote. if its a duplicate signature, dupeout the validator diff --git a/internal/consensus/types/height_vote_set.go b/internal/consensus/types/height_vote_set.go index dd8630725..661c5120e 100644 --- a/internal/consensus/types/height_vote_set.go +++ b/internal/consensus/types/height_vote_set.go @@ -199,7 +199,7 @@ func (hvs *HeightVoteSet) SetPeerMaj23( if voteSet == nil { return nil // something we don't know about yet } - return voteSet.SetPeerMaj23(types.P2PID(peerID), blockID) + return voteSet.SetPeerMaj23(string(peerID), blockID) } //--------------------------------------------------------- diff --git a/internal/consensus/wal_generator.go b/internal/consensus/wal_generator.go index e99c054c0..b11930f16 100644 --- a/internal/consensus/wal_generator.go +++ b/internal/consensus/wal_generator.go @@ -7,7 +7,6 @@ import ( "fmt" "io" mrand "math/rand" - "path/filepath" "testing" "time" @@ -26,18 +25,17 @@ import ( "github.com/tendermint/tendermint/types" ) -// WALGenerateNBlocks generates a consensus WAL. It does this by spinning up a -// stripped down version of node (proxy app, event bus, consensus state) with a -// persistent kvstore application and special consensus wal instance -// (byteBufferWAL) and waits until numBlocks are created. +// WALGenerateNBlocks generates a consensus WAL. It does this by +// spinning up a stripped down version of node (proxy app, event bus, +// consensus state) with a kvstore application and special consensus +// wal instance (byteBufferWAL) and waits until numBlocks are created. // If the node fails to produce given numBlocks, it fails the test. func WALGenerateNBlocks(ctx context.Context, t *testing.T, logger log.Logger, wr io.Writer, numBlocks int) { t.Helper() cfg := getConfig(t) - app := kvstore.NewPersistentKVStoreApplication(logger, filepath.Join(cfg.DBDir(), "wal_generator")) - t.Cleanup(func() { require.NoError(t, app.Close()) }) + app := kvstore.NewApplication() logger.Info("generating WAL (last height msg excluded)", "numBlocks", numBlocks) @@ -82,7 +80,7 @@ func WALGenerateNBlocks(ctx context.Context, t *testing.T, logger log.Logger, wr mempool := emptyMempool{} evpool := sm.EmptyEvidencePool{} - blockExec := sm.NewBlockExecutor(stateStore, log.TestingLogger(), proxyApp, mempool, evpool, blockStore, eventBus) + blockExec := sm.NewBlockExecutor(stateStore, log.NewNopLogger(), proxyApp, mempool, evpool, blockStore, eventBus) consensusState, err := NewState(ctx, logger, cfg.Consensus, stateStore, blockExec, blockStore, mempool, evpool, eventBus) if err != nil { t.Fatal(err) diff --git a/internal/consensus/wal_test.go b/internal/consensus/wal_test.go index 169b7c327..3aec25093 100644 --- a/internal/consensus/wal_test.go +++ b/internal/consensus/wal_test.go @@ -26,7 +26,7 @@ const walTestFlushInterval = 100 * time.Millisecond func TestWALTruncate(t *testing.T) { walDir := t.TempDir() walFile := filepath.Join(walDir, "wal") - logger := log.TestingLogger() + logger := log.NewNopLogger() ctx, cancel := context.WithCancel(context.Background()) defer cancel() @@ -108,7 +108,7 @@ func TestWALWrite(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() - wal, err := NewWAL(ctx, log.TestingLogger(), walFile) + wal, err := NewWAL(ctx, log.NewNopLogger(), walFile) require.NoError(t, err) err = wal.Start(ctx) require.NoError(t, err) @@ -177,7 +177,7 @@ func TestWALPeriodicSync(t *testing.T) { walFile := filepath.Join(walDir, "wal") defer os.RemoveAll(walFile) - wal, err := NewWAL(ctx, log.TestingLogger(), walFile, autofile.GroupCheckDuration(250*time.Millisecond)) + wal, err := NewWAL(ctx, log.NewNopLogger(), walFile, autofile.GroupCheckDuration(250*time.Millisecond)) require.NoError(t, err) wal.SetFlushInterval(walTestFlushInterval) diff --git a/internal/eventbus/event_bus_test.go b/internal/eventbus/event_bus_test.go index 3ef96b80b..1bfea02e1 100644 --- a/internal/eventbus/event_bus_test.go +++ b/internal/eventbus/event_bus_test.go @@ -22,7 +22,7 @@ func TestEventBusPublishEventTx(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() - eventBus := eventbus.NewDefault(log.TestingLogger()) + eventBus := eventbus.NewDefault(log.NewNopLogger()) err := eventBus.Start(ctx) require.NoError(t, err) @@ -75,7 +75,7 @@ func TestEventBusPublishEventTx(t *testing.T) { func TestEventBusPublishEventNewBlock(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() - eventBus := eventbus.NewDefault(log.TestingLogger()) + eventBus := eventbus.NewDefault(log.NewNopLogger()) err := eventBus.Start(ctx) require.NoError(t, err) @@ -129,7 +129,7 @@ func TestEventBusPublishEventNewBlock(t *testing.T) { func TestEventBusPublishEventTxDuplicateKeys(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() - eventBus := eventbus.NewDefault(log.TestingLogger()) + eventBus := eventbus.NewDefault(log.NewNopLogger()) err := eventBus.Start(ctx) require.NoError(t, err) @@ -247,7 +247,7 @@ func TestEventBusPublishEventNewBlockHeader(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() - eventBus := eventbus.NewDefault(log.TestingLogger()) + eventBus := eventbus.NewDefault(log.NewNopLogger()) err := eventBus.Start(ctx) require.NoError(t, err) @@ -297,7 +297,7 @@ func TestEventBusPublishEventEvidenceValidated(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() - eventBus := eventbus.NewDefault(log.TestingLogger()) + eventBus := eventbus.NewDefault(log.NewNopLogger()) err := eventBus.Start(ctx) require.NoError(t, err) @@ -339,7 +339,7 @@ func TestEventBusPublishEventNewEvidence(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() - eventBus := eventbus.NewDefault(log.TestingLogger()) + eventBus := eventbus.NewDefault(log.NewNopLogger()) err := eventBus.Start(ctx) require.NoError(t, err) @@ -381,7 +381,7 @@ func TestEventBusPublish(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() - eventBus := eventbus.NewDefault(log.TestingLogger()) + eventBus := eventbus.NewDefault(log.NewNopLogger()) err := eventBus.Start(ctx) require.NoError(t, err) @@ -467,7 +467,7 @@ func benchmarkEventBus(numClients int, randQueries bool, randEvents bool, b *tes ctx, cancel := context.WithCancel(context.Background()) defer cancel() - eventBus := eventbus.NewDefault(log.TestingLogger()) // set buffer capacity to 0 so we are not testing cache + eventBus := eventbus.NewDefault(log.NewNopLogger()) // set buffer capacity to 0 so we are not testing cache err := eventBus.Start(ctx) if err != nil { b.Error(err) diff --git a/internal/evidence/reactor_test.go b/internal/evidence/reactor_test.go index 664fb7b4e..0b2c2fb3b 100644 --- a/internal/evidence/reactor_test.go +++ b/internal/evidence/reactor_test.go @@ -56,7 +56,7 @@ func setup(ctx context.Context, t *testing.T, stateStores []sm.Store, chBuf uint numStateStores := len(stateStores) rts := &reactorTestSuite{ numStateStores: numStateStores, - logger: log.TestingLogger().With("testCase", t.Name()), + logger: log.NewNopLogger().With("testCase", t.Name()), network: p2ptest.MakeNetwork(ctx, t, p2ptest.NetworkOptions{NumNodes: numStateStores}), reactors: make(map[types.NodeID]*evidence.Reactor, numStateStores), pools: make(map[types.NodeID]*evidence.Pool, numStateStores), diff --git a/internal/evidence/verify_test.go b/internal/evidence/verify_test.go index 675c5795a..b7f535657 100644 --- a/internal/evidence/verify_test.go +++ b/internal/evidence/verify_test.go @@ -98,7 +98,7 @@ func TestVerify_LunaticAttackAgainstState(t *testing.T) { blockStore.On("LoadBlockMeta", height).Return(&types.BlockMeta{Header: *trusted.Header}) blockStore.On("LoadBlockCommit", commonHeight).Return(common.Commit) blockStore.On("LoadBlockCommit", height).Return(trusted.Commit) - pool := evidence.NewPool(log.TestingLogger(), dbm.NewMemDB(), stateStore, blockStore, evidence.NopMetrics(), nil) + pool := evidence.NewPool(log.NewNopLogger(), dbm.NewMemDB(), stateStore, blockStore, evidence.NopMetrics(), nil) evList := types.EvidenceList{ev} // check that the evidence pool correctly verifies the evidence diff --git a/internal/inspect/inspect_test.go b/internal/inspect/inspect_test.go index 36bbda802..511dbc814 100644 --- a/internal/inspect/inspect_test.go +++ b/internal/inspect/inspect_test.go @@ -31,7 +31,7 @@ import ( func TestInspectConstructor(t *testing.T) { cfg, err := config.ResetTestRoot(t.TempDir(), "test") require.NoError(t, err) - testLogger := log.TestingLogger() + testLogger := log.NewNopLogger() t.Cleanup(leaktest.Check(t)) defer func() { _ = os.RemoveAll(cfg.RootDir) }() t.Run("from config", func(t *testing.T) { @@ -47,7 +47,7 @@ func TestInspectRun(t *testing.T) { cfg, err := config.ResetTestRoot(t.TempDir(), "test") require.NoError(t, err) - testLogger := log.TestingLogger() + testLogger := log.NewNopLogger() t.Cleanup(leaktest.Check(t)) defer func() { _ = os.RemoveAll(cfg.RootDir) }() t.Run("from config", func(t *testing.T) { @@ -85,7 +85,7 @@ func TestBlock(t *testing.T) { eventSinkMock.On("Type").Return(indexer.EventSinkType("Mock")) rpcConfig := config.TestRPCConfig() - l := log.TestingLogger() + l := log.NewNopLogger() d := inspect.New(rpcConfig, blockStoreMock, stateStoreMock, []indexer.EventSink{eventSinkMock}, l) ctx, cancel := context.WithCancel(context.Background()) wg := &sync.WaitGroup{} @@ -132,7 +132,7 @@ func TestTxSearch(t *testing.T) { Return([]*abcitypes.TxResult{testTxResult}, nil) rpcConfig := config.TestRPCConfig() - l := log.TestingLogger() + l := log.NewNopLogger() d := inspect.New(rpcConfig, blockStoreMock, stateStoreMock, []indexer.EventSink{eventSinkMock}, l) ctx, cancel := context.WithCancel(context.Background()) wg := &sync.WaitGroup{} @@ -179,7 +179,7 @@ func TestTx(t *testing.T) { }, nil) rpcConfig := config.TestRPCConfig() - l := log.TestingLogger() + l := log.NewNopLogger() d := inspect.New(rpcConfig, blockStoreMock, stateStoreMock, []indexer.EventSink{eventSinkMock}, l) ctx, cancel := context.WithCancel(context.Background()) wg := &sync.WaitGroup{} @@ -227,7 +227,7 @@ func TestConsensusParams(t *testing.T) { eventSinkMock.On("Type").Return(indexer.EventSinkType("Mock")) rpcConfig := config.TestRPCConfig() - l := log.TestingLogger() + l := log.NewNopLogger() d := inspect.New(rpcConfig, blockStoreMock, stateStoreMock, []indexer.EventSink{eventSinkMock}, l) ctx, cancel := context.WithCancel(context.Background()) @@ -280,7 +280,7 @@ func TestBlockResults(t *testing.T) { eventSinkMock.On("Type").Return(indexer.EventSinkType("Mock")) rpcConfig := config.TestRPCConfig() - l := log.TestingLogger() + l := log.NewNopLogger() d := inspect.New(rpcConfig, blockStoreMock, stateStoreMock, []indexer.EventSink{eventSinkMock}, l) ctx, cancel := context.WithCancel(context.Background()) @@ -328,7 +328,7 @@ func TestCommit(t *testing.T) { eventSinkMock.On("Type").Return(indexer.EventSinkType("Mock")) rpcConfig := config.TestRPCConfig() - l := log.TestingLogger() + l := log.NewNopLogger() d := inspect.New(rpcConfig, blockStoreMock, stateStoreMock, []indexer.EventSink{eventSinkMock}, l) ctx, cancel := context.WithCancel(context.Background()) @@ -382,7 +382,7 @@ func TestBlockByHash(t *testing.T) { eventSinkMock.On("Type").Return(indexer.EventSinkType("Mock")) rpcConfig := config.TestRPCConfig() - l := log.TestingLogger() + l := log.NewNopLogger() d := inspect.New(rpcConfig, blockStoreMock, stateStoreMock, []indexer.EventSink{eventSinkMock}, l) ctx, cancel := context.WithCancel(context.Background()) @@ -435,7 +435,7 @@ func TestBlockchain(t *testing.T) { eventSinkMock.On("Type").Return(indexer.EventSinkType("Mock")) rpcConfig := config.TestRPCConfig() - l := log.TestingLogger() + l := log.NewNopLogger() d := inspect.New(rpcConfig, blockStoreMock, stateStoreMock, []indexer.EventSink{eventSinkMock}, l) ctx, cancel := context.WithCancel(context.Background()) @@ -488,7 +488,7 @@ func TestValidators(t *testing.T) { eventSinkMock.On("Type").Return(indexer.EventSinkType("Mock")) rpcConfig := config.TestRPCConfig() - l := log.TestingLogger() + l := log.NewNopLogger() d := inspect.New(rpcConfig, blockStoreMock, stateStoreMock, []indexer.EventSink{eventSinkMock}, l) ctx, cancel := context.WithCancel(context.Background()) @@ -547,7 +547,7 @@ func TestBlockSearch(t *testing.T) { mock.MatchedBy(func(q *query.Query) bool { return testQuery == q.String() })). Return([]int64{testHeight}, nil) rpcConfig := config.TestRPCConfig() - l := log.TestingLogger() + l := log.NewNopLogger() d := inspect.New(rpcConfig, blockStoreMock, stateStoreMock, []indexer.EventSink{eventSinkMock}, l) ctx, cancel := context.WithCancel(context.Background()) diff --git a/internal/libs/autofile/group.go b/internal/libs/autofile/group.go index 0d1806f22..81e16feea 100644 --- a/internal/libs/autofile/group.go +++ b/internal/libs/autofile/group.go @@ -69,11 +69,6 @@ type Group struct { minIndex int // Includes head maxIndex int // Includes head, where Head will move to - // close this when the processTicks routine is done. - // this ensures we can cleanup the dir after calling Stop - // and the routine won't be trying to access it anymore - doneProcessTicks chan struct{} - // TODO: When we start deleting files, we need to start tracking GroupReaders // and their dependencies. } @@ -101,7 +96,6 @@ func OpenGroup(ctx context.Context, logger log.Logger, headPath string, groupOpt groupCheckDuration: defaultGroupCheckDuration, minIndex: 0, maxIndex: 0, - doneProcessTicks: make(chan struct{}), } for _, option := range groupOptions { @@ -154,13 +148,6 @@ func (g *Group) OnStop() { } } -// Wait blocks until all internal goroutines are finished. Supposed to be -// called after Stop. -func (g *Group) Wait() { - // wait for processTicks routine to finish - <-g.doneProcessTicks -} - // Close closes the head file. The group must be stopped by this moment. func (g *Group) Close() { if err := g.FlushAndSync(); err != nil { @@ -241,8 +228,6 @@ func (g *Group) FlushAndSync() error { } func (g *Group) processTicks(ctx context.Context) { - defer close(g.doneProcessTicks) - for { select { case <-ctx.Done(): diff --git a/internal/libs/autofile/group_test.go b/internal/libs/autofile/group_test.go index e20604d82..4f5e346c2 100644 --- a/internal/libs/autofile/group_test.go +++ b/internal/libs/autofile/group_test.go @@ -47,7 +47,7 @@ func TestCheckHeadSizeLimit(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() - logger := log.TestingLogger() + logger := log.NewNopLogger() g := createTestGroupWithHeadSizeLimit(ctx, t, logger, 1000*1000) @@ -116,7 +116,7 @@ func TestCheckHeadSizeLimit(t *testing.T) { } func TestRotateFile(t *testing.T) { - logger := log.TestingLogger() + logger := log.NewNopLogger() ctx, cancel := context.WithCancel(context.Background()) defer cancel() @@ -181,7 +181,7 @@ func TestRotateFile(t *testing.T) { } func TestWrite(t *testing.T) { - logger := log.TestingLogger() + logger := log.NewNopLogger() ctx, cancel := context.WithCancel(context.Background()) defer cancel() @@ -209,7 +209,7 @@ func TestWrite(t *testing.T) { // test that Read reads the required amount of bytes from all the files in the // group and returns no error if n == size of the given slice. func TestGroupReaderRead(t *testing.T) { - logger := log.TestingLogger() + logger := log.NewNopLogger() ctx, cancel := context.WithCancel(context.Background()) defer cancel() @@ -247,7 +247,7 @@ func TestGroupReaderRead(t *testing.T) { // test that Read returns an error if number of bytes read < size of // the given slice. Subsequent call should return 0, io.EOF. func TestGroupReaderRead2(t *testing.T) { - logger := log.TestingLogger() + logger := log.NewNopLogger() ctx, cancel := context.WithCancel(context.Background()) defer cancel() @@ -287,7 +287,7 @@ func TestGroupReaderRead2(t *testing.T) { } func TestMinIndex(t *testing.T) { - logger := log.TestingLogger() + logger := log.NewNopLogger() ctx, cancel := context.WithCancel(context.Background()) defer cancel() @@ -300,7 +300,7 @@ func TestMinIndex(t *testing.T) { } func TestMaxIndex(t *testing.T) { - logger := log.TestingLogger() + logger := log.NewNopLogger() ctx, cancel := context.WithCancel(context.Background()) defer cancel() diff --git a/internal/mempool/mempool_test.go b/internal/mempool/mempool_test.go index 165e4bd20..0dd6ee00c 100644 --- a/internal/mempool/mempool_test.go +++ b/internal/mempool/mempool_test.go @@ -75,7 +75,7 @@ func (app *application) CheckTx(req abci.RequestCheckTx) abci.ResponseCheckTx { func setup(ctx context.Context, t testing.TB, app abciclient.Client, cacheSize int, options ...TxMempoolOption) *TxMempool { t.Helper() - logger := log.TestingLogger() + logger := log.NewNopLogger() cfg, err := config.ResetTestRoot(t.TempDir(), strings.ReplaceAll(t.Name(), "/", "|")) require.NoError(t, err) diff --git a/internal/mempool/reactor_test.go b/internal/mempool/reactor_test.go index dab63af73..64a8adca1 100644 --- a/internal/mempool/reactor_test.go +++ b/internal/mempool/reactor_test.go @@ -139,14 +139,14 @@ func (rts *reactorTestSuite) waitForTxns(t *testing.T, txs []types.Tx, ids ...ty } wg.Add(1) - go func(pool *TxMempool) { + go func(name types.NodeID, pool *TxMempool) { defer wg.Done() require.Eventually(t, func() bool { return len(txs) == pool.Size() }, time.Minute, 250*time.Millisecond, - "ntx=%d, size=%d", len(txs), pool.Size(), + "node=%q, ntx=%d, size=%d", name, len(txs), pool.Size(), ) - }(pool) + }(name, pool) } wg.Wait() } @@ -196,8 +196,8 @@ func TestReactorBroadcastDoesNotPanic(t *testing.T) { } func TestReactorBroadcastTxs(t *testing.T) { - numTxs := 1000 - numNodes := 10 + numTxs := 512 + numNodes := 4 ctx, cancel := context.WithCancel(context.Background()) defer cancel() diff --git a/internal/p2p/conn/connection_test.go b/internal/p2p/conn/connection_test.go index e68b7584f..49c53f6fb 100644 --- a/internal/p2p/conn/connection_test.go +++ b/internal/p2p/conn/connection_test.go @@ -54,7 +54,7 @@ func TestMConnectionSendFlushStop(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() - clientConn := createTestMConnection(log.TestingLogger(), client) + clientConn := createTestMConnection(log.NewNopLogger(), client) err := clientConn.Start(ctx) require.NoError(t, err) t.Cleanup(waitAll(clientConn)) @@ -91,7 +91,7 @@ func TestMConnectionSend(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() - mconn := createTestMConnection(log.TestingLogger(), client) + mconn := createTestMConnection(log.NewNopLogger(), client) err := mconn.Start(ctx) require.NoError(t, err) t.Cleanup(waitAll(mconn)) @@ -133,7 +133,7 @@ func TestMConnectionReceive(t *testing.T) { case <-ctx.Done(): } } - logger := log.TestingLogger() + logger := log.NewNopLogger() ctx, cancel := context.WithCancel(context.Background()) defer cancel() @@ -168,7 +168,7 @@ func TestMConnectionWillEventuallyTimeout(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() - mconn := createMConnectionWithCallbacks(log.TestingLogger(), client, nil, nil) + mconn := createMConnectionWithCallbacks(log.NewNopLogger(), client, nil, nil) err := mconn.Start(ctx) require.NoError(t, err) t.Cleanup(waitAll(mconn)) @@ -224,7 +224,7 @@ func TestMConnectionMultiplePongsInTheBeginning(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() - mconn := createMConnectionWithCallbacks(log.TestingLogger(), client, onReceive, onError) + mconn := createMConnectionWithCallbacks(log.NewNopLogger(), client, onReceive, onError) err := mconn.Start(ctx) require.NoError(t, err) t.Cleanup(waitAll(mconn)) @@ -282,7 +282,7 @@ func TestMConnectionMultiplePings(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() - mconn := createMConnectionWithCallbacks(log.TestingLogger(), client, onReceive, onError) + mconn := createMConnectionWithCallbacks(log.NewNopLogger(), client, onReceive, onError) err := mconn.Start(ctx) require.NoError(t, err) t.Cleanup(waitAll(mconn)) @@ -339,7 +339,7 @@ func TestMConnectionPingPongs(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() - mconn := createMConnectionWithCallbacks(log.TestingLogger(), client, onReceive, onError) + mconn := createMConnectionWithCallbacks(log.NewNopLogger(), client, onReceive, onError) err := mconn.Start(ctx) require.NoError(t, err) t.Cleanup(waitAll(mconn)) @@ -398,7 +398,7 @@ func TestMConnectionStopsAndReturnsError(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() - mconn := createMConnectionWithCallbacks(log.TestingLogger(), client, onReceive, onError) + mconn := createMConnectionWithCallbacks(log.NewNopLogger(), client, onReceive, onError) err := mconn.Start(ctx) require.NoError(t, err) t.Cleanup(waitAll(mconn)) @@ -433,7 +433,7 @@ func newClientAndServerConnsForReadErrors( {ID: 0x01, Priority: 1, SendQueueCapacity: 1}, {ID: 0x02, Priority: 1, SendQueueCapacity: 1}, } - logger := log.TestingLogger() + logger := log.NewNopLogger() mconnClient := NewMConnection(logger.With("module", "client"), client, chDescs, onReceive, onError, DefaultMConnConfig()) err := mconnClient.Start(ctx) @@ -563,7 +563,7 @@ func TestMConnectionTrySend(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() - mconn := createTestMConnection(log.TestingLogger(), client) + mconn := createTestMConnection(log.NewNopLogger(), client) err := mconn.Start(ctx) require.NoError(t, err) t.Cleanup(waitAll(mconn)) diff --git a/internal/p2p/p2ptest/network.go b/internal/p2p/p2ptest/network.go index 669d92962..fdfc0d45c 100644 --- a/internal/p2p/p2ptest/network.go +++ b/internal/p2p/p2ptest/network.go @@ -50,7 +50,7 @@ func (opts *NetworkOptions) setDefaults() { // connects them to each other. func MakeNetwork(ctx context.Context, t *testing.T, opts NetworkOptions) *Network { opts.setDefaults() - logger := log.TestingLogger() + logger := log.NewNopLogger() network := &Network{ Nodes: map[types.NodeID]*Node{}, logger: logger, diff --git a/internal/p2p/peermanager.go b/internal/p2p/peermanager.go index 3293468ae..4044ad569 100644 --- a/internal/p2p/peermanager.go +++ b/internal/p2p/peermanager.go @@ -42,7 +42,8 @@ const ( type PeerScore uint8 const ( - PeerScorePersistent PeerScore = math.MaxUint8 // persistent peers + PeerScorePersistent PeerScore = math.MaxUint8 // persistent peers + MaxPeerScoreNotPersistent PeerScore = PeerScorePersistent - 1 ) // PeerUpdate is a peer update event sent via PeerUpdates. @@ -1283,6 +1284,9 @@ func (p *peerInfo) Score() PeerScore { } score := p.MutableScore + if score > int64(MaxPeerScoreNotPersistent) { + score = int64(MaxPeerScoreNotPersistent) + } for _, addr := range p.AddressInfo { // DialFailures is reset when dials succeed, so this @@ -1294,10 +1298,6 @@ func (p *peerInfo) Score() PeerScore { return 0 } - if score >= math.MaxUint8 { - return PeerScore(math.MaxUint8) - } - return PeerScore(score) } diff --git a/internal/p2p/peermanager_scoring_test.go b/internal/p2p/peermanager_scoring_test.go index 4c7bef0cc..a45df0b72 100644 --- a/internal/p2p/peermanager_scoring_test.go +++ b/internal/p2p/peermanager_scoring_test.go @@ -80,4 +80,20 @@ func TestPeerScoring(t *testing.T) { time.Millisecond, "startAt=%d score=%d", start, peerManager.Scores()[id]) }) + t.Run("TestNonPersistantPeerUpperBound", func(t *testing.T) { + start := int64(peerManager.Scores()[id] + 1) + + for i := start; i <= int64(PeerScorePersistent); i++ { + peerManager.processPeerEvent(ctx, PeerUpdate{ + NodeID: id, + Status: PeerStatusGood, + }) + + if i == int64(PeerScorePersistent) { + require.EqualValues(t, MaxPeerScoreNotPersistent, peerManager.Scores()[id]) + } else { + require.EqualValues(t, i, peerManager.Scores()[id]) + } + } + }) } diff --git a/internal/p2p/pex/reactor_test.go b/internal/p2p/pex/reactor_test.go index 356d7f435..288755e19 100644 --- a/internal/p2p/pex/reactor_test.go +++ b/internal/p2p/pex/reactor_test.go @@ -303,7 +303,7 @@ func setupSingle(ctx context.Context, t *testing.T) *singleTestReactor { return pexCh, nil } - reactor, err := pex.NewReactor(ctx, log.TestingLogger(), peerManager, chCreator, peerUpdates) + reactor, err := pex.NewReactor(ctx, log.NewNopLogger(), peerManager, chCreator, peerUpdates) require.NoError(t, err) require.NoError(t, reactor.Start(ctx)) @@ -365,7 +365,7 @@ func setupNetwork(ctx context.Context, t *testing.T, opts testOptions) *reactorT realNodes := opts.TotalNodes - opts.MockNodes rts := &reactorTestSuite{ - logger: log.TestingLogger().With("testCase", t.Name()), + logger: log.NewNopLogger().With("testCase", t.Name()), network: p2ptest.MakeNetwork(ctx, t, networkOpts), reactors: make(map[types.NodeID]*pex.Reactor, realNodes), pexChannels: make(map[types.NodeID]*p2p.Channel, opts.TotalNodes), diff --git a/internal/p2p/router_filter_test.go b/internal/p2p/router_filter_test.go index 8915dc888..217be8d32 100644 --- a/internal/p2p/router_filter_test.go +++ b/internal/p2p/router_filter_test.go @@ -15,7 +15,7 @@ import ( func TestConnectionFiltering(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() - logger := log.TestingLogger() + logger := log.NewNopLogger() filterByIPCount := 0 router := &Router{ diff --git a/internal/p2p/router_test.go b/internal/p2p/router_test.go index 61bdd2f89..6142dc45f 100644 --- a/internal/p2p/router_test.go +++ b/internal/p2p/router_test.go @@ -107,7 +107,7 @@ func TestRouter_Channel_Basic(t *testing.T) { router, err := p2p.NewRouter( ctx, - log.TestingLogger(), + log.NewNopLogger(), p2p.NopMetrics(), selfInfo, selfKey, @@ -410,7 +410,7 @@ func TestRouter_AcceptPeers(t *testing.T) { router, err := p2p.NewRouter( ctx, - log.TestingLogger(), + log.NewNopLogger(), p2p.NopMetrics(), selfInfo, selfKey, @@ -465,7 +465,7 @@ func TestRouter_AcceptPeers_Error(t *testing.T) { router, err := p2p.NewRouter( ctx, - log.TestingLogger(), + log.NewNopLogger(), p2p.NopMetrics(), selfInfo, selfKey, @@ -503,7 +503,7 @@ func TestRouter_AcceptPeers_ErrorEOF(t *testing.T) { router, err := p2p.NewRouter( ctx, - log.TestingLogger(), + log.NewNopLogger(), p2p.NopMetrics(), selfInfo, selfKey, @@ -555,7 +555,7 @@ func TestRouter_AcceptPeers_HeadOfLineBlocking(t *testing.T) { router, err := p2p.NewRouter( ctx, - log.TestingLogger(), + log.NewNopLogger(), p2p.NopMetrics(), selfInfo, selfKey, @@ -659,7 +659,7 @@ func TestRouter_DialPeers(t *testing.T) { router, err := p2p.NewRouter( ctx, - log.TestingLogger(), + log.NewNopLogger(), p2p.NopMetrics(), selfInfo, selfKey, @@ -745,7 +745,7 @@ func TestRouter_DialPeers_Parallel(t *testing.T) { router, err := p2p.NewRouter( ctx, - log.TestingLogger(), + log.NewNopLogger(), p2p.NopMetrics(), selfInfo, selfKey, @@ -820,7 +820,7 @@ func TestRouter_EvictPeers(t *testing.T) { router, err := p2p.NewRouter( ctx, - log.TestingLogger(), + log.NewNopLogger(), p2p.NopMetrics(), selfInfo, selfKey, @@ -883,7 +883,7 @@ func TestRouter_ChannelCompatability(t *testing.T) { router, err := p2p.NewRouter( ctx, - log.TestingLogger(), + log.NewNopLogger(), p2p.NopMetrics(), selfInfo, selfKey, @@ -939,7 +939,7 @@ func TestRouter_DontSendOnInvalidChannel(t *testing.T) { router, err := p2p.NewRouter( ctx, - log.TestingLogger(), + log.NewNopLogger(), p2p.NopMetrics(), selfInfo, selfKey, diff --git a/internal/p2p/transport_mconn_test.go b/internal/p2p/transport_mconn_test.go index a25278933..423f4e930 100644 --- a/internal/p2p/transport_mconn_test.go +++ b/internal/p2p/transport_mconn_test.go @@ -20,7 +20,7 @@ import ( func init() { testTransports["mconn"] = func(t *testing.T) p2p.Transport { transport := p2p.NewMConnTransport( - log.TestingLogger(), + log.NewNopLogger(), conn.DefaultMConnConfig(), []*p2p.ChannelDescriptor{{ID: chID, Priority: 1}}, p2p.MConnTransportOptions{}, @@ -40,7 +40,7 @@ func init() { func TestMConnTransport_AcceptBeforeListen(t *testing.T) { transport := p2p.NewMConnTransport( - log.TestingLogger(), + log.NewNopLogger(), conn.DefaultMConnConfig(), []*p2p.ChannelDescriptor{{ID: chID, Priority: 1}}, p2p.MConnTransportOptions{ @@ -63,7 +63,7 @@ func TestMConnTransport_AcceptMaxAcceptedConnections(t *testing.T) { defer cancel() transport := p2p.NewMConnTransport( - log.TestingLogger(), + log.NewNopLogger(), conn.DefaultMConnConfig(), []*p2p.ChannelDescriptor{{ID: chID, Priority: 1}}, p2p.MConnTransportOptions{ @@ -153,7 +153,7 @@ func TestMConnTransport_Listen(t *testing.T) { t.Cleanup(leaktest.Check(t)) transport := p2p.NewMConnTransport( - log.TestingLogger(), + log.NewNopLogger(), conn.DefaultMConnConfig(), []*p2p.ChannelDescriptor{{ID: chID, Priority: 1}}, p2p.MConnTransportOptions{}, diff --git a/internal/p2p/transport_memory_test.go b/internal/p2p/transport_memory_test.go index 0569faa30..33d96cdb8 100644 --- a/internal/p2p/transport_memory_test.go +++ b/internal/p2p/transport_memory_test.go @@ -19,7 +19,7 @@ func init() { testTransports["memory"] = func(t *testing.T) p2p.Transport { if network == nil { - network = p2p.NewMemoryNetwork(log.TestingLogger(), 1) + network = p2p.NewMemoryNetwork(log.NewNopLogger(), 1) } i := byte(network.Size()) nodeID, err := types.NewNodeID(hex.EncodeToString(bytes.Repeat([]byte{i<<4 + i}, 20))) diff --git a/internal/proxy/client_test.go b/internal/proxy/client_test.go index ca32b99e8..c3991a8e2 100644 --- a/internal/proxy/client_test.go +++ b/internal/proxy/client_test.go @@ -58,7 +58,7 @@ var SOCKET = "socket" func TestEcho(t *testing.T) { sockPath := fmt.Sprintf("unix:///tmp/echo_%v.sock", tmrand.Str(6)) - logger := log.TestingLogger() + logger := log.NewNopLogger() client, err := abciclient.NewClient(logger, sockPath, SOCKET, true) if err != nil { t.Fatal(err) @@ -98,7 +98,7 @@ func TestEcho(t *testing.T) { func BenchmarkEcho(b *testing.B) { b.StopTimer() // Initialize sockPath := fmt.Sprintf("unix:///tmp/echo_%v.sock", tmrand.Str(6)) - logger := log.TestingLogger() + logger := log.NewNopLogger() client, err := abciclient.NewClient(logger, sockPath, SOCKET, true) if err != nil { b.Fatal(err) @@ -146,7 +146,7 @@ func TestInfo(t *testing.T) { defer cancel() sockPath := fmt.Sprintf("unix:///tmp/echo_%v.sock", tmrand.Str(6)) - logger := log.TestingLogger() + logger := log.NewNopLogger() client, err := abciclient.NewClient(logger, sockPath, SOCKET, true) if err != nil { t.Fatal(err) @@ -189,7 +189,7 @@ func TestAppConns_Start_Stop(t *testing.T) { clientMock.On("Wait").Return(nil).Times(1) cl := &noopStoppableClientImpl{Client: clientMock} - appConns := New(cl, log.TestingLogger(), NopMetrics()) + appConns := New(cl, log.NewNopLogger(), NopMetrics()) err := appConns.Start(ctx) require.NoError(t, err) @@ -219,7 +219,7 @@ func TestAppConns_Failure(t *testing.T) { clientMock.On("Error").Return(errors.New("EOF")) cl := &noopStoppableClientImpl{Client: clientMock} - appConns := New(cl, log.TestingLogger(), NopMetrics()) + appConns := New(cl, log.NewNopLogger(), NopMetrics()) err := appConns.Start(ctx) require.NoError(t, err) diff --git a/internal/pubsub/example_test.go b/internal/pubsub/example_test.go index 5eea61eb8..22e735d6a 100644 --- a/internal/pubsub/example_test.go +++ b/internal/pubsub/example_test.go @@ -16,7 +16,7 @@ func TestExample(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() - s := newTestServer(ctx, t, log.TestingLogger()) + s := newTestServer(ctx, t, log.NewNopLogger()) sub := newTestSub(t).must(s.SubscribeWithArgs(ctx, pubsub.SubscribeArgs{ ClientID: "example-client", diff --git a/internal/pubsub/pubsub_test.go b/internal/pubsub/pubsub_test.go index eee065fbf..a6938ff75 100644 --- a/internal/pubsub/pubsub_test.go +++ b/internal/pubsub/pubsub_test.go @@ -30,7 +30,7 @@ func TestSubscribeWithArgs(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() - logger := log.TestingLogger() + logger := log.NewNopLogger() s := newTestServer(ctx, t, logger) t.Run("DefaultLimit", func(t *testing.T) { @@ -59,7 +59,7 @@ func TestSubscribeWithArgs(t *testing.T) { func TestObserver(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() - logger := log.TestingLogger() + logger := log.NewNopLogger() s := newTestServer(ctx, t, logger) @@ -81,7 +81,7 @@ func TestObserverErrors(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() - logger := log.TestingLogger() + logger := log.NewNopLogger() s := newTestServer(ctx, t, logger) @@ -94,7 +94,7 @@ func TestPublishDoesNotBlock(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() - logger := log.TestingLogger() + logger := log.NewNopLogger() s := newTestServer(ctx, t, logger) @@ -124,7 +124,7 @@ func TestSubscribeErrors(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() - logger := log.TestingLogger() + logger := log.NewNopLogger() s := newTestServer(ctx, t, logger) t.Run("NegativeLimitErr", func(t *testing.T) { @@ -141,7 +141,7 @@ func TestSlowSubscriber(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() - logger := log.TestingLogger() + logger := log.NewNopLogger() s := newTestServer(ctx, t, logger) sub := newTestSub(t).must(s.SubscribeWithArgs(ctx, pubsub.SubscribeArgs{ @@ -163,7 +163,7 @@ func TestDifferentClients(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() - logger := log.TestingLogger() + logger := log.NewNopLogger() s := newTestServer(ctx, t, logger) sub1 := newTestSub(t).must(s.SubscribeWithArgs(ctx, pubsub.SubscribeArgs{ @@ -218,7 +218,7 @@ func TestSubscribeDuplicateKeys(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() - logger := log.TestingLogger() + logger := log.NewNopLogger() s := newTestServer(ctx, t, logger) testCases := []struct { @@ -274,7 +274,7 @@ func TestClientSubscribesTwice(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() - logger := log.TestingLogger() + logger := log.NewNopLogger() s := newTestServer(ctx, t, logger) q := query.MustCompile(`tm.events.type='NewBlock'`) @@ -310,7 +310,7 @@ func TestUnsubscribe(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() - logger := log.TestingLogger() + logger := log.NewNopLogger() s := newTestServer(ctx, t, logger) sub := newTestSub(t).must(s.SubscribeWithArgs(ctx, pubsub.SubscribeArgs{ @@ -335,7 +335,7 @@ func TestClientUnsubscribesTwice(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() - logger := log.TestingLogger() + logger := log.NewNopLogger() s := newTestServer(ctx, t, logger) newTestSub(t).must(s.SubscribeWithArgs(ctx, pubsub.SubscribeArgs{ @@ -357,7 +357,7 @@ func TestResubscribe(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() - logger := log.TestingLogger() + logger := log.NewNopLogger() s := newTestServer(ctx, t, logger) args := pubsub.SubscribeArgs{ @@ -381,7 +381,7 @@ func TestUnsubscribeAll(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() - logger := log.TestingLogger() + logger := log.NewNopLogger() s := newTestServer(ctx, t, logger) sub1 := newTestSub(t).must(s.SubscribeWithArgs(ctx, pubsub.SubscribeArgs{ @@ -402,7 +402,7 @@ func TestUnsubscribeAll(t *testing.T) { } func TestBufferCapacity(t *testing.T) { - logger := log.TestingLogger() + logger := log.NewNopLogger() s := pubsub.NewServer(logger, pubsub.BufferCapacity(2)) require.Equal(t, 2, s.BufferCapacity()) diff --git a/internal/state/execution.go b/internal/state/execution.go index 906a7ec3f..d2cfc27ba 100644 --- a/internal/state/execution.go +++ b/internal/state/execution.go @@ -150,7 +150,7 @@ func (blockExec *BlockExecutor) CreateProposalBlock( txrSet := types.NewTxRecordSet(rpp.TxRecords) if err := txrSet.Validate(maxDataBytes, block.Txs); err != nil { - return nil, err + panic(fmt.Errorf("ResponsePrepareProposal validation: %w", err)) } for _, rtx := range txrSet.RemovedTxs() { diff --git a/internal/state/execution_test.go b/internal/state/execution_test.go index 9bbe19c12..8eca18684 100644 --- a/internal/state/execution_test.go +++ b/internal/state/execution_test.go @@ -38,7 +38,7 @@ var ( func TestApplyBlock(t *testing.T) { app := &testApp{} - logger := log.TestingLogger() + logger := log.NewNopLogger() cc := abciclient.NewLocalClient(logger, app) proxyApp := proxy.New(cc, logger, proxy.NopMetrics()) @@ -85,7 +85,7 @@ func TestFinalizeBlockDecidedLastCommit(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() - logger := log.TestingLogger() + logger := log.NewNopLogger() app := &testApp{} cc := abciclient.NewLocalClient(logger, app) appClient := proxy.New(cc, logger, proxy.NopMetrics()) @@ -128,7 +128,7 @@ func TestFinalizeBlockDecidedLastCommit(t *testing.T) { eventBus := eventbus.NewDefault(logger) require.NoError(t, eventBus.Start(ctx)) - blockExec := sm.NewBlockExecutor(stateStore, log.TestingLogger(), appClient, mp, evpool, blockStore, eventBus) + blockExec := sm.NewBlockExecutor(stateStore, log.NewNopLogger(), appClient, mp, evpool, blockStore, eventBus) state, _, lastCommit := makeAndCommitGoodBlock(ctx, t, state, 1, new(types.Commit), state.NextValidators.Validators[0].Address, blockExec, privVals, nil) for idx, isAbsent := range tc.absentCommitSigs { @@ -160,7 +160,7 @@ func TestFinalizeBlockByzantineValidators(t *testing.T) { defer cancel() app := &testApp{} - logger := log.TestingLogger() + logger := log.NewNopLogger() cc := abciclient.NewLocalClient(logger, app) proxyApp := proxy.New(cc, logger, proxy.NopMetrics()) err := proxyApp.Start(ctx) @@ -252,7 +252,7 @@ func TestFinalizeBlockByzantineValidators(t *testing.T) { blockStore := store.NewBlockStore(dbm.NewMemDB()) - blockExec := sm.NewBlockExecutor(stateStore, log.TestingLogger(), proxyApp, + blockExec := sm.NewBlockExecutor(stateStore, log.NewNopLogger(), proxyApp, mp, evpool, blockStore, eventBus) block := sf.MakeBlock(state, 1, new(types.Commit)) @@ -277,7 +277,7 @@ func TestProcessProposal(t *testing.T) { defer cancel() app := abcimocks.NewBaseMock() - logger := log.TestingLogger() + logger := log.NewNopLogger() cc := abciclient.NewLocalClient(logger, app) proxyApp := proxy.New(cc, logger, proxy.NopMetrics()) err := proxyApp.Start(ctx) @@ -482,7 +482,7 @@ func TestFinalizeBlockValidatorUpdates(t *testing.T) { defer cancel() app := &testApp{} - logger := log.TestingLogger() + logger := log.NewNopLogger() cc := abciclient.NewLocalClient(logger, app) proxyApp := proxy.New(cc, logger, proxy.NopMetrics()) err := proxyApp.Start(ctx) @@ -565,7 +565,7 @@ func TestFinalizeBlockValidatorUpdatesResultingInEmptySet(t *testing.T) { defer cancel() app := &testApp{} - logger := log.TestingLogger() + logger := log.NewNopLogger() cc := abciclient.NewLocalClient(logger, app) proxyApp := proxy.New(cc, logger, proxy.NopMetrics()) err := proxyApp.Start(ctx) @@ -579,7 +579,7 @@ func TestFinalizeBlockValidatorUpdatesResultingInEmptySet(t *testing.T) { blockStore := store.NewBlockStore(dbm.NewMemDB()) blockExec := sm.NewBlockExecutor( stateStore, - log.TestingLogger(), + log.NewNopLogger(), proxyApp, new(mpmocks.Mempool), sm.EmptyEvidencePool{}, @@ -609,7 +609,7 @@ func TestEmptyPrepareProposal(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() - logger := log.TestingLogger() + logger := log.NewNopLogger() eventBus := eventbus.NewDefault(logger) require.NoError(t, eventBus.Start(ctx)) @@ -653,6 +653,64 @@ func TestEmptyPrepareProposal(t *testing.T) { require.NoError(t, err) } +// TestPrepareProposalPanicOnInvalid tests that the block creation logic panics +// if the ResponsePrepareProposal returned from the application is invalid. +func TestPrepareProposalPanicOnInvalid(t *testing.T) { + const height = 2 + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + logger := log.NewNopLogger() + eventBus := eventbus.NewDefault(logger) + require.NoError(t, eventBus.Start(ctx)) + + state, stateDB, privVals := makeState(t, 1, height) + stateStore := sm.NewStore(stateDB) + + evpool := &mocks.EvidencePool{} + evpool.On("PendingEvidence", mock.Anything).Return([]types.Evidence{}, int64(0)) + + mp := &mpmocks.Mempool{} + mp.On("ReapMaxBytesMaxGas", mock.Anything, mock.Anything).Return(types.Txs{}) + + app := abcimocks.NewBaseMock() + + // create an invalid ResponsePrepareProposal + rpp := abci.ResponsePrepareProposal{ + ModifiedTx: true, + TxRecords: []*abci.TxRecord{ + { + Action: abci.TxRecord_REMOVED, + Tx: []byte("new tx"), + }, + }, + } + app.On("PrepareProposal", mock.Anything).Return(rpp, nil) + + cc := abciclient.NewLocalClient(logger, app) + proxyApp := proxy.New(cc, logger, proxy.NopMetrics()) + err := proxyApp.Start(ctx) + require.NoError(t, err) + + blockExec := sm.NewBlockExecutor( + stateStore, + logger, + proxyApp, + mp, + evpool, + nil, + eventBus, + ) + pa, _ := state.Validators.GetByIndex(0) + commit := makeValidCommit(ctx, t, height, types.BlockID{}, state.Validators, privVals) + require.Panics(t, + func() { + blockExec.CreateProposalBlock(ctx, height, state, commit, pa, nil) //nolint:errcheck + }) + + mp.AssertExpectations(t) +} + // TestPrepareProposalRemoveTxs tests that any transactions marked as REMOVED // are not included in the block produced by CreateProposalBlock. The test also // ensures that any transactions removed are also removed from the mempool. @@ -661,7 +719,7 @@ func TestPrepareProposalRemoveTxs(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() - logger := log.TestingLogger() + logger := log.NewNopLogger() eventBus := eventbus.NewDefault(logger) require.NoError(t, eventBus.Start(ctx)) @@ -722,7 +780,7 @@ func TestPrepareProposalAddedTxsIncluded(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() - logger := log.TestingLogger() + logger := log.NewNopLogger() eventBus := eventbus.NewDefault(logger) require.NoError(t, eventBus.Start(ctx)) @@ -781,7 +839,7 @@ func TestPrepareProposalReorderTxs(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() - logger := log.TestingLogger() + logger := log.NewNopLogger() eventBus := eventbus.NewDefault(logger) require.NoError(t, eventBus.Start(ctx)) @@ -839,7 +897,7 @@ func TestPrepareProposalModifiedTxStatusFalse(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() - logger := log.TestingLogger() + logger := log.NewNopLogger() eventBus := eventbus.NewDefault(logger) require.NoError(t, eventBus.Start(ctx)) diff --git a/internal/state/indexer/indexer_service_test.go b/internal/state/indexer/indexer_service_test.go index f6261c519..a71b204ec 100644 --- a/internal/state/indexer/indexer_service_test.go +++ b/internal/state/indexer/indexer_service_test.go @@ -43,7 +43,7 @@ func TestIndexerServiceIndexesBlocks(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() - logger := tmlog.TestingLogger() + logger := tmlog.NewNopLogger() // event bus eventBus := eventbus.NewDefault(logger) err := eventBus.Start(ctx) diff --git a/internal/state/validation_test.go b/internal/state/validation_test.go index 62d2051cb..b7b56adb8 100644 --- a/internal/state/validation_test.go +++ b/internal/state/validation_test.go @@ -33,7 +33,7 @@ const validationTestsStopHeight int64 = 10 func TestValidateBlockHeader(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() - logger := log.TestingLogger() + logger := log.NewNopLogger() proxyApp := proxy.New(abciclient.NewLocalClient(logger, &testApp{}), logger, proxy.NopMetrics()) require.NoError(t, proxyApp.Start(ctx)) @@ -136,7 +136,7 @@ func TestValidateBlockCommit(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() - logger := log.TestingLogger() + logger := log.NewNopLogger() proxyApp := proxy.New(abciclient.NewLocalClient(logger, &testApp{}), logger, proxy.NopMetrics()) require.NoError(t, proxyApp.Start(ctx)) @@ -277,7 +277,7 @@ func TestValidateBlockEvidence(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() - logger := log.TestingLogger() + logger := log.NewNopLogger() proxyApp := proxy.New(abciclient.NewLocalClient(logger, &testApp{}), logger, proxy.NopMetrics()) require.NoError(t, proxyApp.Start(ctx)) @@ -309,7 +309,7 @@ func TestValidateBlockEvidence(t *testing.T) { state.ConsensusParams.Evidence.MaxBytes = 1000 blockExec := sm.NewBlockExecutor( stateStore, - log.TestingLogger(), + log.NewNopLogger(), proxyApp, mp, evpool, diff --git a/libs/events/events.go b/libs/events/events.go index 5ab5961f6..d96afd7bd 100644 --- a/libs/events/events.go +++ b/libs/events/events.go @@ -7,7 +7,6 @@ import ( "sync" "github.com/tendermint/tendermint/libs/log" - "github.com/tendermint/tendermint/libs/service" ) // ErrListenerWasRemoved is returned by AddEvent if the listener was removed. @@ -45,16 +44,11 @@ type Fireable interface { // They can be removed by calling either RemoveListenerForEvent or // RemoveListener (for all events). type EventSwitch interface { - service.Service Fireable - Stop() - AddListenerForEvent(listenerID, eventValue string, cb EventCallback) error } type eventSwitch struct { - service.BaseService - mtx sync.RWMutex eventCells map[string]*eventCell listeners map[string]*eventListener @@ -65,13 +59,9 @@ func NewEventSwitch(logger log.Logger) EventSwitch { eventCells: make(map[string]*eventCell), listeners: make(map[string]*eventListener), } - evsw.BaseService = *service.NewBaseService(logger, "EventSwitch", evsw) return evsw } -func (evsw *eventSwitch) OnStart(ctx context.Context) error { return nil } -func (evsw *eventSwitch) OnStop() {} - func (evsw *eventSwitch) AddListenerForEvent(listenerID, eventValue string, cb EventCallback) error { // Get/Create eventCell and listener. evsw.mtx.Lock() diff --git a/libs/events/events_test.go b/libs/events/events_test.go index 15f208e06..55e0103da 100644 --- a/libs/events/events_test.go +++ b/libs/events/events_test.go @@ -21,8 +21,6 @@ func TestAddListenerForEventFireOnce(t *testing.T) { logger := log.NewTestingLogger(t) evsw := NewEventSwitch(logger) - require.NoError(t, evsw.Start(ctx)) - t.Cleanup(evsw.Wait) messages := make(chan EventData) require.NoError(t, evsw.AddListenerForEvent("listener", "event", @@ -50,8 +48,6 @@ func TestAddListenerForEventFireMany(t *testing.T) { logger := log.NewTestingLogger(t) evsw := NewEventSwitch(logger) - require.NoError(t, evsw.Start(ctx)) - t.Cleanup(evsw.Wait) doneSum := make(chan uint64) doneSending := make(chan uint64) @@ -88,8 +84,6 @@ func TestAddListenerForDifferentEvents(t *testing.T) { logger := log.NewTestingLogger(t) evsw := NewEventSwitch(logger) - require.NoError(t, evsw.Start(ctx)) - t.Cleanup(evsw.Wait) doneSum := make(chan uint64) doneSending1 := make(chan uint64) @@ -151,9 +145,6 @@ func TestAddDifferentListenerForDifferentEvents(t *testing.T) { logger := log.NewTestingLogger(t) evsw := NewEventSwitch(logger) - require.NoError(t, evsw.Start(ctx)) - - t.Cleanup(evsw.Wait) doneSum1 := make(chan uint64) doneSum2 := make(chan uint64) @@ -247,8 +238,6 @@ func TestManageListenersAsync(t *testing.T) { logger := log.NewTestingLogger(t) evsw := NewEventSwitch(logger) - require.NoError(t, evsw.Start(ctx)) - t.Cleanup(evsw.Wait) doneSum1 := make(chan uint64) doneSum2 := make(chan uint64) diff --git a/libs/log/testing.go b/libs/log/testing.go index 649ebab40..464a7036f 100644 --- a/libs/log/testing.go +++ b/libs/log/testing.go @@ -6,24 +6,6 @@ import ( "github.com/rs/zerolog" ) -// TestingLogger was a legacy constructor that wrote logging output to -// standardoutput when in verbose mode, and no-op'ed test logs -// otherwise. Now it always no-ops, but if you need to see logs from -// tests, you can replace this call with `NewTestingLogger` -// constructor. -func TestingLogger() Logger { - return NewNopLogger() -} - -type testingWriter struct { - t testing.TB -} - -func (tw testingWriter) Write(in []byte) (int, error) { - tw.t.Log(string(in)) - return len(in), nil -} - // NewTestingLogger converts a testing.T into a logging interface to // make test failures and verbose provide better feedback associated // with test failures. This logging instance is safe for use from @@ -58,3 +40,12 @@ func NewTestingLoggerWithLevel(t testing.TB, level string) Logger { Logger: zerolog.New(newSyncWriter(testingWriter{t})).Level(logLevel), } } + +type testingWriter struct { + t testing.TB +} + +func (tw testingWriter) Write(in []byte) (int, error) { + tw.t.Log(string(in)) + return len(in), nil +} diff --git a/light/client_test.go b/light/client_test.go index d2f5a137c..fbf8536ba 100644 --- a/light/client_test.go +++ b/light/client_test.go @@ -224,7 +224,7 @@ func TestClient(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() - logger := log.NewTestingLogger(t) + logger := log.NewNopLogger() mockNode := mockNodeFromHeadersAndVals(testCase.otherHeaders, testCase.vals) mockNode.On("LightBlock", mock.Anything, mock.Anything).Return(nil, provider.ErrLightBlockNotFound) @@ -351,7 +351,7 @@ func TestClient(t *testing.T) { t.Run(tc.name, func(t *testing.T) { ctx, cancel := context.WithCancel(bctx) defer cancel() - logger := log.NewTestingLogger(t) + logger := log.NewNopLogger() mockNode := mockNodeFromHeadersAndVals(tc.otherHeaders, tc.vals) mockNode.On("LightBlock", mock.Anything, mock.Anything).Return(nil, provider.ErrLightBlockNotFound) @@ -466,7 +466,7 @@ func TestClient(t *testing.T) { t.Run("Cleanup", func(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() - logger := log.NewTestingLogger(t) + logger := log.NewNopLogger() mockFullNode := &provider_mocks.Provider{} mockFullNode.On("LightBlock", mock.Anything, int64(1)).Return(l1, nil) @@ -503,7 +503,7 @@ func TestClient(t *testing.T) { ctx, cancel := context.WithCancel(bctx) defer cancel() - logger := log.NewTestingLogger(t) + logger := log.NewNopLogger() mockNode := &provider_mocks.Provider{} trustedStore := dbs.New(dbm.NewMemDB()) @@ -538,7 +538,7 @@ func TestClient(t *testing.T) { err := trustedStore.SaveLightBlock(l1) require.NoError(t, err) - logger := log.NewTestingLogger(t) + logger := log.NewNopLogger() // header1 != h1 header1 := keys.GenSignedHeader(t, chainID, 1, bTime.Add(1*time.Hour), nil, vals, vals, @@ -584,7 +584,7 @@ func TestClient(t *testing.T) { mockWitnessNode.On("LightBlock", mock.Anything, int64(1)).Return(l1, nil) mockWitnessNode.On("LightBlock", mock.Anything, int64(3)).Return(l3, nil) - logger := log.NewTestingLogger(t) + logger := log.NewNopLogger() c, err := light.NewClient( ctx, @@ -611,7 +611,7 @@ func TestClient(t *testing.T) { t.Run("Concurrency", func(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() - logger := log.NewTestingLogger(t) + logger := log.NewNopLogger() mockFullNode := &provider_mocks.Provider{} mockFullNode.On("LightBlock", mock.Anything, int64(2)).Return(l2, nil) @@ -664,7 +664,7 @@ func TestClient(t *testing.T) { 1: h1, 2: h2, }, valSet) - logger := log.NewTestingLogger(t) + logger := log.NewNopLogger() c, err := light.NewClient( ctx, @@ -705,7 +705,7 @@ func TestClient(t *testing.T) { mockDeadNode.On("LightBlock", mock.Anything, mock.Anything).Return(nil, provider.ErrNoResponse) mockDeadNode.On("ID").Return("mockDeadNode") - logger := log.NewTestingLogger(t) + logger := log.NewNopLogger() c, err := light.NewClient( ctx, @@ -738,7 +738,7 @@ func TestClient(t *testing.T) { mockFullNode.On("LightBlock", mock.Anything, mock.Anything).Return(l1, nil) mockFullNode.On("ID").Return("mockFullNode") - logger := log.NewTestingLogger(t) + logger := log.NewNopLogger() mockDeadNode1 := &provider_mocks.Provider{} mockDeadNode1.On("LightBlock", mock.Anything, mock.Anything).Return(nil, provider.ErrLightBlockNotFound) @@ -770,7 +770,7 @@ func TestClient(t *testing.T) { t.Run("BackwardsVerification", func(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() - logger := log.NewTestingLogger(t) + logger := log.NewNopLogger() { headers, vals, _ := genLightBlocksWithKeys(t, chainID, 9, 3, 0, bTime) @@ -886,7 +886,7 @@ func TestClient(t *testing.T) { mockWitness.AssertExpectations(t) }) t.Run("RemovesWitnessIfItSendsUsIncorrectHeader", func(t *testing.T) { - logger := log.NewTestingLogger(t) + logger := log.NewNopLogger() // different headers hash then primary plus less than 1/3 signed (no fork) headers1 := map[int64]*types.SignedHeader{ @@ -959,7 +959,7 @@ func TestClient(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() - logger := log.NewTestingLogger(t) + logger := log.NewNopLogger() differentVals, _ := factory.ValidatorSet(ctx, t, 10, 100) mockBadValSetNode := mockNodeFromHeadersAndVals( @@ -1043,7 +1043,7 @@ func TestClient(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() - logger := log.NewTestingLogger(t) + logger := log.NewNopLogger() c, err := light.NewClient( ctx, diff --git a/light/detector_test.go b/light/detector_test.go index 2e5304822..82b0d2de6 100644 --- a/light/detector_test.go +++ b/light/detector_test.go @@ -21,7 +21,7 @@ import ( ) func TestLightClientAttackEvidence_Lunatic(t *testing.T) { - logger := log.NewTestingLogger(t) + logger := log.NewNopLogger() // primary performs a lunatic attack var ( @@ -144,7 +144,7 @@ func TestLightClientAttackEvidence_Equivocation(t *testing.T) { ctx, cancel := context.WithCancel(bctx) defer cancel() - logger := log.NewTestingLogger(t) + logger := log.NewNopLogger() // primary performs an equivocation attack var ( @@ -248,7 +248,7 @@ func TestLightClientAttackEvidence_ForwardLunatic(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() - logger := log.NewTestingLogger(t) + logger := log.NewNopLogger() witnessHeaders, witnessValidators, chainKeys := genLightBlocksWithKeys(t, chainID, latestHeight, valSize, 2, bTime) for _, unusedHeader := range []int64{3, 5, 6, 8} { @@ -411,7 +411,7 @@ func TestClientDivergentTraces1(t *testing.T) { mockWitness := mockNodeFromHeadersAndVals(headers, vals) mockWitness.On("ID").Return("mockWitness") - logger := log.NewTestingLogger(t) + logger := log.NewNopLogger() _, err = light.NewClient( ctx, @@ -437,7 +437,7 @@ func TestClientDivergentTraces1(t *testing.T) { func TestClientDivergentTraces2(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() - logger := log.NewTestingLogger(t) + logger := log.NewNopLogger() headers, vals, _ := genLightBlocksWithKeys(t, chainID, 2, 5, 2, bTime) mockPrimaryNode := mockNodeFromHeadersAndVals(headers, vals) @@ -482,7 +482,7 @@ func TestClientDivergentTraces2(t *testing.T) { // => creation should succeed, but the verification should fail //nolint: dupl func TestClientDivergentTraces3(t *testing.T) { - logger := log.NewTestingLogger(t) + logger := log.NewNopLogger() // primaryHeaders, primaryVals, _ := genLightBlocksWithKeys(t, chainID, 2, 5, 2, bTime) @@ -527,7 +527,7 @@ func TestClientDivergentTraces3(t *testing.T) { // It should be ignored //nolint: dupl func TestClientDivergentTraces4(t *testing.T) { - logger := log.NewTestingLogger(t) + logger := log.NewNopLogger() // primaryHeaders, primaryVals, _ := genLightBlocksWithKeys(t, chainID, 2, 5, 2, bTime) diff --git a/light/light_test.go b/light/light_test.go index 35e6d1933..58fd9f0b1 100644 --- a/light/light_test.go +++ b/light/light_test.go @@ -30,7 +30,7 @@ func TestClientIntegration_Update(t *testing.T) { conf, err := rpctest.CreateConfig(t, t.Name()) require.NoError(t, err) - logger := log.NewTestingLogger(t) + logger := log.NewNopLogger() // Start a test application app := kvstore.NewApplication() @@ -90,7 +90,7 @@ func TestClientIntegration_VerifyLightBlockAtHeight(t *testing.T) { conf, err := rpctest.CreateConfig(t, t.Name()) require.NoError(t, err) - logger := log.NewTestingLogger(t) + logger := log.NewNopLogger() // Start a test application app := kvstore.NewApplication() @@ -201,7 +201,7 @@ func TestClientStatusRPC(t *testing.T) { primary, witnesses, dbs.New(db), - light.Logger(log.TestingLogger()), + light.Logger(log.NewNopLogger()), ) require.NoError(t, err) diff --git a/types/keys.go b/types/keys.go deleted file mode 100644 index 941e82b65..000000000 --- a/types/keys.go +++ /dev/null @@ -1,6 +0,0 @@ -package types - -// UNSTABLE -var ( - PeerStateKey = "ConsensusReactor.peerState" -) diff --git a/types/signable.go b/types/signable.go index 25e307316..85b9bb911 100644 --- a/types/signable.go +++ b/types/signable.go @@ -11,13 +11,3 @@ var ( // XXX: secp256k1 does not have Size nor MaxSize defined. MaxSignatureSize = tmmath.MaxInt(ed25519.SignatureSize, 64) ) - -// Signable is an interface for all signable things. -// It typically removes signatures before serializing. -// SignBytes returns the bytes to be signed -// NOTE: chainIDs are part of the SignBytes but not -// necessarily the object themselves. -// NOTE: Expected to panic if there is an error marshaling. -type Signable interface { - SignBytes(chainID string) []byte -} diff --git a/types/vote_set.go b/types/vote_set.go index 438d089b3..142a4130c 100644 --- a/types/vote_set.go +++ b/types/vote_set.go @@ -18,12 +18,6 @@ const ( MaxVotesCount = 10000 ) -// UNSTABLE -// XXX: duplicate of p2p.ID to avoid dependence between packages. -// Perhaps we can have a minimal types package containing this (and other things?) -// that both `types` and `p2p` import ? -type P2PID string - /* VoteSet helps collect signatures from validators at each height+round for a predefined vote type. @@ -71,7 +65,7 @@ type VoteSet struct { sum int64 // Sum of voting power for seen votes, discounting conflicts maj23 *BlockID // First 2/3 majority seen votesByBlock map[string]*blockVotes // string(blockHash|blockParts) -> blockVotes - peerMaj23s map[P2PID]BlockID // Maj23 for each peer + peerMaj23s map[string]BlockID // Maj23 for each peer } // Constructs a new VoteSet struct used to accumulate votes for given height/round. @@ -91,7 +85,7 @@ func NewVoteSet(chainID string, height int64, round int32, sum: 0, maj23: nil, votesByBlock: make(map[string]*blockVotes, valSet.Size()), - peerMaj23s: make(map[P2PID]BlockID), + peerMaj23s: make(map[string]BlockID), } } @@ -313,7 +307,7 @@ func (voteSet *VoteSet) addVerifiedVote( // this can cause memory issues. // TODO: implement ability to remove peers too // NOTE: VoteSet must not be nil -func (voteSet *VoteSet) SetPeerMaj23(peerID P2PID, blockID BlockID) error { +func (voteSet *VoteSet) SetPeerMaj23(peerID string, blockID BlockID) error { if voteSet == nil { panic("SetPeerMaj23() on nil VoteSet") } @@ -530,9 +524,9 @@ func (voteSet *VoteSet) MarshalJSON() ([]byte, error) { // NOTE: insufficient for unmarshaling from (compressed votes) // TODO: make the peerMaj23s nicer to read (eg just the block hash) type VoteSetJSON struct { - Votes []string `json:"votes"` - VotesBitArray string `json:"votes_bit_array"` - PeerMaj23s map[P2PID]BlockID `json:"peer_maj_23s"` + Votes []string `json:"votes"` + VotesBitArray string `json:"votes_bit_array"` + PeerMaj23s map[string]BlockID `json:"peer_maj_23s"` } // Return the bit-array of votes including