From d0e33b429288fb7d284205732beb82b59b4dea76 Mon Sep 17 00:00:00 2001 From: Hlib Kanunnikov Date: Mon, 23 Aug 2021 17:45:08 +0300 Subject: [PATCH] blocksync: complete transition from Blockchain to BlockSync (#6847) --- internal/blocksync/v0/reactor.go | 118 +++++++++++++------------- internal/blocksync/v0/reactor_test.go | 38 ++++----- node/setup.go | 4 +- 3 files changed, 80 insertions(+), 80 deletions(-) diff --git a/internal/blocksync/v0/reactor.go b/internal/blocksync/v0/reactor.go index c43959808..4ddfa4edc 100644 --- a/internal/blocksync/v0/reactor.go +++ b/internal/blocksync/v0/reactor.go @@ -29,10 +29,10 @@ var ( // TODO: Remove once p2p refactor is complete. // ref: https://github.com/tendermint/tendermint/issues/5670 ChannelShims = map[p2p.ChannelID]*p2p.ChannelDescriptorShim{ - BlockchainChannel: { + BlockSyncChannel: { MsgType: new(bcproto.Message), Descriptor: &p2p.ChannelDescriptor{ - ID: byte(BlockchainChannel), + ID: byte(BlockSyncChannel), Priority: 5, SendQueueCapacity: 1000, RecvBufferCapacity: 1024, @@ -44,8 +44,8 @@ var ( ) const ( - // BlockchainChannel is a channel for blocks and status updates - BlockchainChannel = p2p.ChannelID(0x40) + // BlockSyncChannel is a channel for blocks and status updates + BlockSyncChannel = p2p.ChannelID(0x40) trySyncIntervalMS = 10 @@ -60,7 +60,7 @@ const ( ) type consensusReactor interface { - // For when we switch from blockchain reactor and block sync to the consensus + // For when we switch from block sync reactor to the consensus // machine. SwitchToConsensus(state sm.State, skipWAL bool) } @@ -87,17 +87,17 @@ type Reactor struct { consReactor consensusReactor blockSync *tmSync.AtomicBool - blockchainCh *p2p.Channel - // blockchainOutBridgeCh defines a channel that acts as a bridge between sending Envelope - // messages that the reactor will consume in processBlockchainCh and receiving messages + blockSyncCh *p2p.Channel + // blockSyncOutBridgeCh defines a channel that acts as a bridge between sending Envelope + // messages that the reactor will consume in processBlockSyncCh and receiving messages // from the peer updates channel and other goroutines. We do this instead of directly - // sending on blockchainCh.Out to avoid race conditions in the case where other goroutines - // send Envelopes directly to the to blockchainCh.Out channel, since processBlockchainCh - // may close the blockchainCh.Out channel at the same time that other goroutines send to - // blockchainCh.Out. - blockchainOutBridgeCh chan p2p.Envelope - peerUpdates *p2p.PeerUpdates - closeCh chan struct{} + // sending on blockSyncCh.Out to avoid race conditions in the case where other goroutines + // send Envelopes directly to the to blockSyncCh.Out channel, since processBlockSyncCh + // may close the blockSyncCh.Out channel at the same time that other goroutines send to + // blockSyncCh.Out. + blockSyncOutBridgeCh chan p2p.Envelope + peerUpdates *p2p.PeerUpdates + closeCh chan struct{} requestsCh <-chan BlockRequest errorsCh <-chan peerError @@ -119,7 +119,7 @@ func NewReactor( blockExec *sm.BlockExecutor, store *store.BlockStore, consReactor consensusReactor, - blockchainCh *p2p.Channel, + blockSyncCh *p2p.Channel, peerUpdates *p2p.PeerUpdates, blockSync bool, metrics *cons.Metrics, @@ -137,23 +137,23 @@ func NewReactor( errorsCh := make(chan peerError, maxPeerErrBuffer) // NOTE: The capacity should be larger than the peer count. r := &Reactor{ - initialState: state, - blockExec: blockExec, - store: store, - pool: NewBlockPool(startHeight, requestsCh, errorsCh), - consReactor: consReactor, - blockSync: tmSync.NewBool(blockSync), - requestsCh: requestsCh, - errorsCh: errorsCh, - blockchainCh: blockchainCh, - blockchainOutBridgeCh: make(chan p2p.Envelope), - peerUpdates: peerUpdates, - closeCh: make(chan struct{}), - metrics: metrics, - syncStartTime: time.Time{}, + initialState: state, + blockExec: blockExec, + store: store, + pool: NewBlockPool(startHeight, requestsCh, errorsCh), + consReactor: consReactor, + blockSync: tmSync.NewBool(blockSync), + requestsCh: requestsCh, + errorsCh: errorsCh, + blockSyncCh: blockSyncCh, + blockSyncOutBridgeCh: make(chan p2p.Envelope), + peerUpdates: peerUpdates, + closeCh: make(chan struct{}), + metrics: metrics, + syncStartTime: time.Time{}, } - r.BaseService = *service.NewBaseService(logger, "Blockchain", r) + r.BaseService = *service.NewBaseService(logger, "BlockSync", r) return r, nil } @@ -174,7 +174,7 @@ func (r *Reactor) OnStart() error { go r.poolRoutine(false) } - go r.processBlockchainCh() + go r.processBlockSyncCh() go r.processPeerUpdates() return nil @@ -199,7 +199,7 @@ func (r *Reactor) OnStop() { // Wait for all p2p Channels to be closed before returning. This ensures we // can easily reason about synchronization of all p2p Channels and ensure no // panics will occur. - <-r.blockchainCh.Done() + <-r.blockSyncCh.Done() <-r.peerUpdates.Done() } @@ -214,7 +214,7 @@ func (r *Reactor) respondToPeer(msg *bcproto.BlockRequest, peerID types.NodeID) return } - r.blockchainCh.Out <- p2p.Envelope{ + r.blockSyncCh.Out <- p2p.Envelope{ To: peerID, Message: &bcproto.BlockResponse{Block: blockProto}, } @@ -223,16 +223,16 @@ func (r *Reactor) respondToPeer(msg *bcproto.BlockRequest, peerID types.NodeID) } r.Logger.Info("peer requesting a block we do not have", "peer", peerID, "height", msg.Height) - r.blockchainCh.Out <- p2p.Envelope{ + r.blockSyncCh.Out <- p2p.Envelope{ To: peerID, Message: &bcproto.NoBlockResponse{Height: msg.Height}, } } -// handleBlockchainMessage handles envelopes sent from peers on the -// BlockchainChannel. It returns an error only if the Envelope.Message is unknown +// handleBlockSyncMessage handles envelopes sent from peers on the +// BlockSyncChannel. It returns an error only if the Envelope.Message is unknown // for this channel. This should never be called outside of handleMessage. -func (r *Reactor) handleBlockchainMessage(envelope p2p.Envelope) error { +func (r *Reactor) handleBlockSyncMessage(envelope p2p.Envelope) error { logger := r.Logger.With("peer", envelope.From) switch msg := envelope.Message.(type) { @@ -249,7 +249,7 @@ func (r *Reactor) handleBlockchainMessage(envelope p2p.Envelope) error { r.pool.AddBlock(envelope.From, block, block.Size()) case *bcproto.StatusRequest: - r.blockchainCh.Out <- p2p.Envelope{ + r.blockSyncCh.Out <- p2p.Envelope{ To: envelope.From, Message: &bcproto.StatusResponse{ Height: r.store.Height(), @@ -288,8 +288,8 @@ func (r *Reactor) handleMessage(chID p2p.ChannelID, envelope p2p.Envelope) (err r.Logger.Debug("received message", "message", envelope.Message, "peer", envelope.From) switch chID { - case BlockchainChannel: - err = r.handleBlockchainMessage(envelope) + case BlockSyncChannel: + err = r.handleBlockSyncMessage(envelope) default: err = fmt.Errorf("unknown channel ID (%d) for envelope (%v)", chID, envelope) @@ -298,30 +298,30 @@ func (r *Reactor) handleMessage(chID p2p.ChannelID, envelope p2p.Envelope) (err return err } -// processBlockchainCh initiates a blocking process where we listen for and handle -// envelopes on the BlockchainChannel and blockchainOutBridgeCh. Any error encountered during -// message execution will result in a PeerError being sent on the BlockchainChannel. +// processBlockSyncCh initiates a blocking process where we listen for and handle +// envelopes on the BlockSyncChannel and blockSyncOutBridgeCh. Any error encountered during +// message execution will result in a PeerError being sent on the BlockSyncChannel. // When the reactor is stopped, we will catch the signal and close the p2p Channel // gracefully. -func (r *Reactor) processBlockchainCh() { - defer r.blockchainCh.Close() +func (r *Reactor) processBlockSyncCh() { + defer r.blockSyncCh.Close() for { select { - case envelope := <-r.blockchainCh.In: - if err := r.handleMessage(r.blockchainCh.ID, envelope); err != nil { - r.Logger.Error("failed to process message", "ch_id", r.blockchainCh.ID, "envelope", envelope, "err", err) - r.blockchainCh.Error <- p2p.PeerError{ + case envelope := <-r.blockSyncCh.In: + if err := r.handleMessage(r.blockSyncCh.ID, envelope); err != nil { + r.Logger.Error("failed to process message", "ch_id", r.blockSyncCh.ID, "envelope", envelope, "err", err) + r.blockSyncCh.Error <- p2p.PeerError{ NodeID: envelope.From, Err: err, } } - case envelope := <-r.blockchainOutBridgeCh: - r.blockchainCh.Out <- envelope + case envelope := <-r.blockSyncOutBridgeCh: + r.blockSyncCh.Out <- envelope case <-r.closeCh: - r.Logger.Debug("stopped listening on blockchain channel; closing...") + r.Logger.Debug("stopped listening on block sync channel; closing...") return } @@ -340,7 +340,7 @@ func (r *Reactor) processPeerUpdate(peerUpdate p2p.PeerUpdate) { switch peerUpdate.Status { case p2p.PeerStatusUp: // send a status update the newly added peer - r.blockchainOutBridgeCh <- p2p.Envelope{ + r.blockSyncOutBridgeCh <- p2p.Envelope{ To: peerUpdate.NodeID, Message: &bcproto.StatusResponse{ Base: r.store.Base(), @@ -406,13 +406,13 @@ func (r *Reactor) requestRoutine() { return case request := <-r.requestsCh: - r.blockchainOutBridgeCh <- p2p.Envelope{ + r.blockSyncOutBridgeCh <- p2p.Envelope{ To: request.PeerID, Message: &bcproto.BlockRequest{Height: request.Height}, } case pErr := <-r.errorsCh: - r.blockchainCh.Error <- p2p.PeerError{ + r.blockSyncCh.Error <- p2p.PeerError{ NodeID: pErr.peerID, Err: pErr.err, } @@ -423,7 +423,7 @@ func (r *Reactor) requestRoutine() { go func() { defer r.poolWG.Done() - r.blockchainOutBridgeCh <- p2p.Envelope{ + r.blockSyncOutBridgeCh <- p2p.Envelope{ Broadcast: true, Message: &bcproto.StatusRequest{}, } @@ -554,14 +554,14 @@ FOR_LOOP: // NOTE: We've already removed the peer's request, but we still need // to clean up the rest. peerID := r.pool.RedoRequest(first.Height) - r.blockchainCh.Error <- p2p.PeerError{ + r.blockSyncCh.Error <- p2p.PeerError{ NodeID: peerID, Err: err, } peerID2 := r.pool.RedoRequest(second.Height) if peerID2 != peerID { - r.blockchainCh.Error <- p2p.PeerError{ + r.blockSyncCh.Error <- p2p.PeerError{ NodeID: peerID2, Err: err, } diff --git a/internal/blocksync/v0/reactor_test.go b/internal/blocksync/v0/reactor_test.go index e038b57af..a1ddc02cd 100644 --- a/internal/blocksync/v0/reactor_test.go +++ b/internal/blocksync/v0/reactor_test.go @@ -32,9 +32,9 @@ type reactorTestSuite struct { reactors map[types.NodeID]*Reactor app map[types.NodeID]proxy.AppConns - blockchainChannels map[types.NodeID]*p2p.Channel - peerChans map[types.NodeID]chan p2p.PeerUpdate - peerUpdates map[types.NodeID]*p2p.PeerUpdates + blockSyncChannels map[types.NodeID]*p2p.Channel + peerChans map[types.NodeID]chan p2p.PeerUpdate + peerUpdates map[types.NodeID]*p2p.PeerUpdates blockSync bool } @@ -53,19 +53,19 @@ func setup( "must specify at least one block height (nodes)") rts := &reactorTestSuite{ - logger: log.TestingLogger().With("module", "blockchain", "testCase", t.Name()), - network: p2ptest.MakeNetwork(t, p2ptest.NetworkOptions{NumNodes: numNodes}), - nodes: make([]types.NodeID, 0, numNodes), - reactors: make(map[types.NodeID]*Reactor, numNodes), - app: make(map[types.NodeID]proxy.AppConns, numNodes), - blockchainChannels: make(map[types.NodeID]*p2p.Channel, numNodes), - peerChans: make(map[types.NodeID]chan p2p.PeerUpdate, numNodes), - peerUpdates: make(map[types.NodeID]*p2p.PeerUpdates, numNodes), - blockSync: true, + logger: log.TestingLogger().With("module", "block_sync", "testCase", t.Name()), + network: p2ptest.MakeNetwork(t, p2ptest.NetworkOptions{NumNodes: numNodes}), + nodes: make([]types.NodeID, 0, numNodes), + reactors: make(map[types.NodeID]*Reactor, numNodes), + app: make(map[types.NodeID]proxy.AppConns, numNodes), + blockSyncChannels: make(map[types.NodeID]*p2p.Channel, numNodes), + peerChans: make(map[types.NodeID]chan p2p.PeerUpdate, numNodes), + peerUpdates: make(map[types.NodeID]*p2p.PeerUpdates, numNodes), + blockSync: true, } - chDesc := p2p.ChannelDescriptor{ID: byte(BlockchainChannel)} - rts.blockchainChannels = rts.network.MakeChannelsNoCleanup(t, chDesc, new(bcproto.Message), int(chBuf)) + chDesc := p2p.ChannelDescriptor{ID: byte(BlockSyncChannel)} + rts.blockSyncChannels = rts.network.MakeChannelsNoCleanup(t, chDesc, new(bcproto.Message), int(chBuf)) i := 0 for nodeID := range rts.network.Nodes { @@ -161,7 +161,7 @@ func (rts *reactorTestSuite) addNode(t *testing.T, blockExec, blockStore, nil, - rts.blockchainChannels[nodeID], + rts.blockSyncChannels[nodeID], rts.peerUpdates[nodeID], rts.blockSync, cons.NopMetrics()) @@ -181,7 +181,7 @@ func (rts *reactorTestSuite) start(t *testing.T) { } func TestReactor_AbruptDisconnect(t *testing.T) { - config := cfg.ResetTestRoot("blockchain_reactor_test") + config := cfg.ResetTestRoot("block_sync_reactor_test") defer os.RemoveAll(config.RootDir) genDoc, privVals := factory.RandGenesisDoc(config, 1, false, 30) @@ -216,7 +216,7 @@ func TestReactor_AbruptDisconnect(t *testing.T) { } func TestReactor_SyncTime(t *testing.T) { - config := cfg.ResetTestRoot("blockchain_reactor_test") + config := cfg.ResetTestRoot("block_sync_reactor_test") defer os.RemoveAll(config.RootDir) genDoc, privVals := factory.RandGenesisDoc(config, 1, false, 30) @@ -239,7 +239,7 @@ func TestReactor_SyncTime(t *testing.T) { } func TestReactor_NoBlockResponse(t *testing.T) { - config := cfg.ResetTestRoot("blockchain_reactor_test") + config := cfg.ResetTestRoot("block_sync_reactor_test") defer os.RemoveAll(config.RootDir) genDoc, privVals := factory.RandGenesisDoc(config, 1, false, 30) @@ -286,7 +286,7 @@ func TestReactor_BadBlockStopsPeer(t *testing.T) { // See: https://github.com/tendermint/tendermint/issues/6005 t.SkipNow() - config := cfg.ResetTestRoot("blockchain_reactor_test") + config := cfg.ResetTestRoot("block_sync_reactor_test") defer os.RemoveAll(config.RootDir) maxBlockHeight := int64(48) diff --git a/node/setup.go b/node/setup.go index ceadcd688..e9bfb029a 100644 --- a/node/setup.go +++ b/node/setup.go @@ -362,7 +362,7 @@ func createBlockchainReactor( reactor, err := bcv0.NewReactor( logger, state.Copy(), blockExec, blockStore, csReactor, - channels[bcv0.BlockchainChannel], peerUpdates, blockSync, + channels[bcv0.BlockSyncChannel], peerUpdates, blockSync, metrics, ) if err != nil { @@ -727,7 +727,7 @@ func makeNodeInfo( var bcChannel byte switch config.BlockSync.Version { case cfg.BlockSyncV0: - bcChannel = byte(bcv0.BlockchainChannel) + bcChannel = byte(bcv0.BlockSyncChannel) case cfg.BlockSyncV2: bcChannel = bcv2.BlockchainChannel