Browse Source

blocksync: complete transition from Blockchain to BlockSync (#6847)

pull/6852/head
Hlib Kanunnikov 3 years ago
committed by GitHub
parent
commit
d0e33b4292
No known key found for this signature in database GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 80 additions and 80 deletions
  1. +59
    -59
      internal/blocksync/v0/reactor.go
  2. +19
    -19
      internal/blocksync/v0/reactor_test.go
  3. +2
    -2
      node/setup.go

+ 59
- 59
internal/blocksync/v0/reactor.go View File

@ -29,10 +29,10 @@ var (
// TODO: Remove once p2p refactor is complete. // TODO: Remove once p2p refactor is complete.
// ref: https://github.com/tendermint/tendermint/issues/5670 // ref: https://github.com/tendermint/tendermint/issues/5670
ChannelShims = map[p2p.ChannelID]*p2p.ChannelDescriptorShim{ ChannelShims = map[p2p.ChannelID]*p2p.ChannelDescriptorShim{
BlockchainChannel: {
BlockSyncChannel: {
MsgType: new(bcproto.Message), MsgType: new(bcproto.Message),
Descriptor: &p2p.ChannelDescriptor{ Descriptor: &p2p.ChannelDescriptor{
ID: byte(BlockchainChannel),
ID: byte(BlockSyncChannel),
Priority: 5, Priority: 5,
SendQueueCapacity: 1000, SendQueueCapacity: 1000,
RecvBufferCapacity: 1024, RecvBufferCapacity: 1024,
@ -44,8 +44,8 @@ var (
) )
const ( const (
// BlockchainChannel is a channel for blocks and status updates
BlockchainChannel = p2p.ChannelID(0x40)
// BlockSyncChannel is a channel for blocks and status updates
BlockSyncChannel = p2p.ChannelID(0x40)
trySyncIntervalMS = 10 trySyncIntervalMS = 10
@ -60,7 +60,7 @@ const (
) )
type consensusReactor interface { type consensusReactor interface {
// For when we switch from blockchain reactor and block sync to the consensus
// For when we switch from block sync reactor to the consensus
// machine. // machine.
SwitchToConsensus(state sm.State, skipWAL bool) SwitchToConsensus(state sm.State, skipWAL bool)
} }
@ -87,17 +87,17 @@ type Reactor struct {
consReactor consensusReactor consReactor consensusReactor
blockSync *tmSync.AtomicBool blockSync *tmSync.AtomicBool
blockchainCh *p2p.Channel
// blockchainOutBridgeCh defines a channel that acts as a bridge between sending Envelope
// messages that the reactor will consume in processBlockchainCh and receiving messages
blockSyncCh *p2p.Channel
// blockSyncOutBridgeCh defines a channel that acts as a bridge between sending Envelope
// messages that the reactor will consume in processBlockSyncCh and receiving messages
// from the peer updates channel and other goroutines. We do this instead of directly // from the peer updates channel and other goroutines. We do this instead of directly
// sending on blockchainCh.Out to avoid race conditions in the case where other goroutines
// send Envelopes directly to the to blockchainCh.Out channel, since processBlockchainCh
// may close the blockchainCh.Out channel at the same time that other goroutines send to
// blockchainCh.Out.
blockchainOutBridgeCh chan p2p.Envelope
peerUpdates *p2p.PeerUpdates
closeCh chan struct{}
// sending on blockSyncCh.Out to avoid race conditions in the case where other goroutines
// send Envelopes directly to the to blockSyncCh.Out channel, since processBlockSyncCh
// may close the blockSyncCh.Out channel at the same time that other goroutines send to
// blockSyncCh.Out.
blockSyncOutBridgeCh chan p2p.Envelope
peerUpdates *p2p.PeerUpdates
closeCh chan struct{}
requestsCh <-chan BlockRequest requestsCh <-chan BlockRequest
errorsCh <-chan peerError errorsCh <-chan peerError
@ -119,7 +119,7 @@ func NewReactor(
blockExec *sm.BlockExecutor, blockExec *sm.BlockExecutor,
store *store.BlockStore, store *store.BlockStore,
consReactor consensusReactor, consReactor consensusReactor,
blockchainCh *p2p.Channel,
blockSyncCh *p2p.Channel,
peerUpdates *p2p.PeerUpdates, peerUpdates *p2p.PeerUpdates,
blockSync bool, blockSync bool,
metrics *cons.Metrics, metrics *cons.Metrics,
@ -137,23 +137,23 @@ func NewReactor(
errorsCh := make(chan peerError, maxPeerErrBuffer) // NOTE: The capacity should be larger than the peer count. errorsCh := make(chan peerError, maxPeerErrBuffer) // NOTE: The capacity should be larger than the peer count.
r := &Reactor{ r := &Reactor{
initialState: state,
blockExec: blockExec,
store: store,
pool: NewBlockPool(startHeight, requestsCh, errorsCh),
consReactor: consReactor,
blockSync: tmSync.NewBool(blockSync),
requestsCh: requestsCh,
errorsCh: errorsCh,
blockchainCh: blockchainCh,
blockchainOutBridgeCh: make(chan p2p.Envelope),
peerUpdates: peerUpdates,
closeCh: make(chan struct{}),
metrics: metrics,
syncStartTime: time.Time{},
initialState: state,
blockExec: blockExec,
store: store,
pool: NewBlockPool(startHeight, requestsCh, errorsCh),
consReactor: consReactor,
blockSync: tmSync.NewBool(blockSync),
requestsCh: requestsCh,
errorsCh: errorsCh,
blockSyncCh: blockSyncCh,
blockSyncOutBridgeCh: make(chan p2p.Envelope),
peerUpdates: peerUpdates,
closeCh: make(chan struct{}),
metrics: metrics,
syncStartTime: time.Time{},
} }
r.BaseService = *service.NewBaseService(logger, "Blockchain", r)
r.BaseService = *service.NewBaseService(logger, "BlockSync", r)
return r, nil return r, nil
} }
@ -174,7 +174,7 @@ func (r *Reactor) OnStart() error {
go r.poolRoutine(false) go r.poolRoutine(false)
} }
go r.processBlockchainCh()
go r.processBlockSyncCh()
go r.processPeerUpdates() go r.processPeerUpdates()
return nil return nil
@ -199,7 +199,7 @@ func (r *Reactor) OnStop() {
// Wait for all p2p Channels to be closed before returning. This ensures we // Wait for all p2p Channels to be closed before returning. This ensures we
// can easily reason about synchronization of all p2p Channels and ensure no // can easily reason about synchronization of all p2p Channels and ensure no
// panics will occur. // panics will occur.
<-r.blockchainCh.Done()
<-r.blockSyncCh.Done()
<-r.peerUpdates.Done() <-r.peerUpdates.Done()
} }
@ -214,7 +214,7 @@ func (r *Reactor) respondToPeer(msg *bcproto.BlockRequest, peerID types.NodeID)
return return
} }
r.blockchainCh.Out <- p2p.Envelope{
r.blockSyncCh.Out <- p2p.Envelope{
To: peerID, To: peerID,
Message: &bcproto.BlockResponse{Block: blockProto}, Message: &bcproto.BlockResponse{Block: blockProto},
} }
@ -223,16 +223,16 @@ func (r *Reactor) respondToPeer(msg *bcproto.BlockRequest, peerID types.NodeID)
} }
r.Logger.Info("peer requesting a block we do not have", "peer", peerID, "height", msg.Height) r.Logger.Info("peer requesting a block we do not have", "peer", peerID, "height", msg.Height)
r.blockchainCh.Out <- p2p.Envelope{
r.blockSyncCh.Out <- p2p.Envelope{
To: peerID, To: peerID,
Message: &bcproto.NoBlockResponse{Height: msg.Height}, Message: &bcproto.NoBlockResponse{Height: msg.Height},
} }
} }
// handleBlockchainMessage handles envelopes sent from peers on the
// BlockchainChannel. It returns an error only if the Envelope.Message is unknown
// handleBlockSyncMessage handles envelopes sent from peers on the
// BlockSyncChannel. It returns an error only if the Envelope.Message is unknown
// for this channel. This should never be called outside of handleMessage. // for this channel. This should never be called outside of handleMessage.
func (r *Reactor) handleBlockchainMessage(envelope p2p.Envelope) error {
func (r *Reactor) handleBlockSyncMessage(envelope p2p.Envelope) error {
logger := r.Logger.With("peer", envelope.From) logger := r.Logger.With("peer", envelope.From)
switch msg := envelope.Message.(type) { switch msg := envelope.Message.(type) {
@ -249,7 +249,7 @@ func (r *Reactor) handleBlockchainMessage(envelope p2p.Envelope) error {
r.pool.AddBlock(envelope.From, block, block.Size()) r.pool.AddBlock(envelope.From, block, block.Size())
case *bcproto.StatusRequest: case *bcproto.StatusRequest:
r.blockchainCh.Out <- p2p.Envelope{
r.blockSyncCh.Out <- p2p.Envelope{
To: envelope.From, To: envelope.From,
Message: &bcproto.StatusResponse{ Message: &bcproto.StatusResponse{
Height: r.store.Height(), Height: r.store.Height(),
@ -288,8 +288,8 @@ func (r *Reactor) handleMessage(chID p2p.ChannelID, envelope p2p.Envelope) (err
r.Logger.Debug("received message", "message", envelope.Message, "peer", envelope.From) r.Logger.Debug("received message", "message", envelope.Message, "peer", envelope.From)
switch chID { switch chID {
case BlockchainChannel:
err = r.handleBlockchainMessage(envelope)
case BlockSyncChannel:
err = r.handleBlockSyncMessage(envelope)
default: default:
err = fmt.Errorf("unknown channel ID (%d) for envelope (%v)", chID, envelope) err = fmt.Errorf("unknown channel ID (%d) for envelope (%v)", chID, envelope)
@ -298,30 +298,30 @@ func (r *Reactor) handleMessage(chID p2p.ChannelID, envelope p2p.Envelope) (err
return err return err
} }
// processBlockchainCh initiates a blocking process where we listen for and handle
// envelopes on the BlockchainChannel and blockchainOutBridgeCh. Any error encountered during
// message execution will result in a PeerError being sent on the BlockchainChannel.
// processBlockSyncCh initiates a blocking process where we listen for and handle
// envelopes on the BlockSyncChannel and blockSyncOutBridgeCh. Any error encountered during
// message execution will result in a PeerError being sent on the BlockSyncChannel.
// When the reactor is stopped, we will catch the signal and close the p2p Channel // When the reactor is stopped, we will catch the signal and close the p2p Channel
// gracefully. // gracefully.
func (r *Reactor) processBlockchainCh() {
defer r.blockchainCh.Close()
func (r *Reactor) processBlockSyncCh() {
defer r.blockSyncCh.Close()
for { for {
select { select {
case envelope := <-r.blockchainCh.In:
if err := r.handleMessage(r.blockchainCh.ID, envelope); err != nil {
r.Logger.Error("failed to process message", "ch_id", r.blockchainCh.ID, "envelope", envelope, "err", err)
r.blockchainCh.Error <- p2p.PeerError{
case envelope := <-r.blockSyncCh.In:
if err := r.handleMessage(r.blockSyncCh.ID, envelope); err != nil {
r.Logger.Error("failed to process message", "ch_id", r.blockSyncCh.ID, "envelope", envelope, "err", err)
r.blockSyncCh.Error <- p2p.PeerError{
NodeID: envelope.From, NodeID: envelope.From,
Err: err, Err: err,
} }
} }
case envelope := <-r.blockchainOutBridgeCh:
r.blockchainCh.Out <- envelope
case envelope := <-r.blockSyncOutBridgeCh:
r.blockSyncCh.Out <- envelope
case <-r.closeCh: case <-r.closeCh:
r.Logger.Debug("stopped listening on blockchain channel; closing...")
r.Logger.Debug("stopped listening on block sync channel; closing...")
return return
} }
@ -340,7 +340,7 @@ func (r *Reactor) processPeerUpdate(peerUpdate p2p.PeerUpdate) {
switch peerUpdate.Status { switch peerUpdate.Status {
case p2p.PeerStatusUp: case p2p.PeerStatusUp:
// send a status update the newly added peer // send a status update the newly added peer
r.blockchainOutBridgeCh <- p2p.Envelope{
r.blockSyncOutBridgeCh <- p2p.Envelope{
To: peerUpdate.NodeID, To: peerUpdate.NodeID,
Message: &bcproto.StatusResponse{ Message: &bcproto.StatusResponse{
Base: r.store.Base(), Base: r.store.Base(),
@ -406,13 +406,13 @@ func (r *Reactor) requestRoutine() {
return return
case request := <-r.requestsCh: case request := <-r.requestsCh:
r.blockchainOutBridgeCh <- p2p.Envelope{
r.blockSyncOutBridgeCh <- p2p.Envelope{
To: request.PeerID, To: request.PeerID,
Message: &bcproto.BlockRequest{Height: request.Height}, Message: &bcproto.BlockRequest{Height: request.Height},
} }
case pErr := <-r.errorsCh: case pErr := <-r.errorsCh:
r.blockchainCh.Error <- p2p.PeerError{
r.blockSyncCh.Error <- p2p.PeerError{
NodeID: pErr.peerID, NodeID: pErr.peerID,
Err: pErr.err, Err: pErr.err,
} }
@ -423,7 +423,7 @@ func (r *Reactor) requestRoutine() {
go func() { go func() {
defer r.poolWG.Done() defer r.poolWG.Done()
r.blockchainOutBridgeCh <- p2p.Envelope{
r.blockSyncOutBridgeCh <- p2p.Envelope{
Broadcast: true, Broadcast: true,
Message: &bcproto.StatusRequest{}, Message: &bcproto.StatusRequest{},
} }
@ -554,14 +554,14 @@ FOR_LOOP:
// NOTE: We've already removed the peer's request, but we still need // NOTE: We've already removed the peer's request, but we still need
// to clean up the rest. // to clean up the rest.
peerID := r.pool.RedoRequest(first.Height) peerID := r.pool.RedoRequest(first.Height)
r.blockchainCh.Error <- p2p.PeerError{
r.blockSyncCh.Error <- p2p.PeerError{
NodeID: peerID, NodeID: peerID,
Err: err, Err: err,
} }
peerID2 := r.pool.RedoRequest(second.Height) peerID2 := r.pool.RedoRequest(second.Height)
if peerID2 != peerID { if peerID2 != peerID {
r.blockchainCh.Error <- p2p.PeerError{
r.blockSyncCh.Error <- p2p.PeerError{
NodeID: peerID2, NodeID: peerID2,
Err: err, Err: err,
} }


+ 19
- 19
internal/blocksync/v0/reactor_test.go View File

@ -32,9 +32,9 @@ type reactorTestSuite struct {
reactors map[types.NodeID]*Reactor reactors map[types.NodeID]*Reactor
app map[types.NodeID]proxy.AppConns app map[types.NodeID]proxy.AppConns
blockchainChannels map[types.NodeID]*p2p.Channel
peerChans map[types.NodeID]chan p2p.PeerUpdate
peerUpdates map[types.NodeID]*p2p.PeerUpdates
blockSyncChannels map[types.NodeID]*p2p.Channel
peerChans map[types.NodeID]chan p2p.PeerUpdate
peerUpdates map[types.NodeID]*p2p.PeerUpdates
blockSync bool blockSync bool
} }
@ -53,19 +53,19 @@ func setup(
"must specify at least one block height (nodes)") "must specify at least one block height (nodes)")
rts := &reactorTestSuite{ rts := &reactorTestSuite{
logger: log.TestingLogger().With("module", "blockchain", "testCase", t.Name()),
network: p2ptest.MakeNetwork(t, p2ptest.NetworkOptions{NumNodes: numNodes}),
nodes: make([]types.NodeID, 0, numNodes),
reactors: make(map[types.NodeID]*Reactor, numNodes),
app: make(map[types.NodeID]proxy.AppConns, numNodes),
blockchainChannels: make(map[types.NodeID]*p2p.Channel, numNodes),
peerChans: make(map[types.NodeID]chan p2p.PeerUpdate, numNodes),
peerUpdates: make(map[types.NodeID]*p2p.PeerUpdates, numNodes),
blockSync: true,
logger: log.TestingLogger().With("module", "block_sync", "testCase", t.Name()),
network: p2ptest.MakeNetwork(t, p2ptest.NetworkOptions{NumNodes: numNodes}),
nodes: make([]types.NodeID, 0, numNodes),
reactors: make(map[types.NodeID]*Reactor, numNodes),
app: make(map[types.NodeID]proxy.AppConns, numNodes),
blockSyncChannels: make(map[types.NodeID]*p2p.Channel, numNodes),
peerChans: make(map[types.NodeID]chan p2p.PeerUpdate, numNodes),
peerUpdates: make(map[types.NodeID]*p2p.PeerUpdates, numNodes),
blockSync: true,
} }
chDesc := p2p.ChannelDescriptor{ID: byte(BlockchainChannel)}
rts.blockchainChannels = rts.network.MakeChannelsNoCleanup(t, chDesc, new(bcproto.Message), int(chBuf))
chDesc := p2p.ChannelDescriptor{ID: byte(BlockSyncChannel)}
rts.blockSyncChannels = rts.network.MakeChannelsNoCleanup(t, chDesc, new(bcproto.Message), int(chBuf))
i := 0 i := 0
for nodeID := range rts.network.Nodes { for nodeID := range rts.network.Nodes {
@ -161,7 +161,7 @@ func (rts *reactorTestSuite) addNode(t *testing.T,
blockExec, blockExec,
blockStore, blockStore,
nil, nil,
rts.blockchainChannels[nodeID],
rts.blockSyncChannels[nodeID],
rts.peerUpdates[nodeID], rts.peerUpdates[nodeID],
rts.blockSync, rts.blockSync,
cons.NopMetrics()) cons.NopMetrics())
@ -181,7 +181,7 @@ func (rts *reactorTestSuite) start(t *testing.T) {
} }
func TestReactor_AbruptDisconnect(t *testing.T) { func TestReactor_AbruptDisconnect(t *testing.T) {
config := cfg.ResetTestRoot("blockchain_reactor_test")
config := cfg.ResetTestRoot("block_sync_reactor_test")
defer os.RemoveAll(config.RootDir) defer os.RemoveAll(config.RootDir)
genDoc, privVals := factory.RandGenesisDoc(config, 1, false, 30) genDoc, privVals := factory.RandGenesisDoc(config, 1, false, 30)
@ -216,7 +216,7 @@ func TestReactor_AbruptDisconnect(t *testing.T) {
} }
func TestReactor_SyncTime(t *testing.T) { func TestReactor_SyncTime(t *testing.T) {
config := cfg.ResetTestRoot("blockchain_reactor_test")
config := cfg.ResetTestRoot("block_sync_reactor_test")
defer os.RemoveAll(config.RootDir) defer os.RemoveAll(config.RootDir)
genDoc, privVals := factory.RandGenesisDoc(config, 1, false, 30) genDoc, privVals := factory.RandGenesisDoc(config, 1, false, 30)
@ -239,7 +239,7 @@ func TestReactor_SyncTime(t *testing.T) {
} }
func TestReactor_NoBlockResponse(t *testing.T) { func TestReactor_NoBlockResponse(t *testing.T) {
config := cfg.ResetTestRoot("blockchain_reactor_test")
config := cfg.ResetTestRoot("block_sync_reactor_test")
defer os.RemoveAll(config.RootDir) defer os.RemoveAll(config.RootDir)
genDoc, privVals := factory.RandGenesisDoc(config, 1, false, 30) genDoc, privVals := factory.RandGenesisDoc(config, 1, false, 30)
@ -286,7 +286,7 @@ func TestReactor_BadBlockStopsPeer(t *testing.T) {
// See: https://github.com/tendermint/tendermint/issues/6005 // See: https://github.com/tendermint/tendermint/issues/6005
t.SkipNow() t.SkipNow()
config := cfg.ResetTestRoot("blockchain_reactor_test")
config := cfg.ResetTestRoot("block_sync_reactor_test")
defer os.RemoveAll(config.RootDir) defer os.RemoveAll(config.RootDir)
maxBlockHeight := int64(48) maxBlockHeight := int64(48)


+ 2
- 2
node/setup.go View File

@ -362,7 +362,7 @@ func createBlockchainReactor(
reactor, err := bcv0.NewReactor( reactor, err := bcv0.NewReactor(
logger, state.Copy(), blockExec, blockStore, csReactor, logger, state.Copy(), blockExec, blockStore, csReactor,
channels[bcv0.BlockchainChannel], peerUpdates, blockSync,
channels[bcv0.BlockSyncChannel], peerUpdates, blockSync,
metrics, metrics,
) )
if err != nil { if err != nil {
@ -727,7 +727,7 @@ func makeNodeInfo(
var bcChannel byte var bcChannel byte
switch config.BlockSync.Version { switch config.BlockSync.Version {
case cfg.BlockSyncV0: case cfg.BlockSyncV0:
bcChannel = byte(bcv0.BlockchainChannel)
bcChannel = byte(bcv0.BlockSyncChannel)
case cfg.BlockSyncV2: case cfg.BlockSyncV2:
bcChannel = bcv2.BlockchainChannel bcChannel = bcv2.BlockchainChannel


Loading…
Cancel
Save