Browse Source

consensus+p2p: change how consensus reactor is constructed (#7525)

pull/7529/head
Sam Kleinman 2 years ago
committed by GitHub
parent
commit
10402b728f
No known key found for this signature in database GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 64 additions and 35 deletions
  1. +4
    -1
      internal/consensus/byzantine_test.go
  2. +29
    -12
      internal/consensus/reactor.go
  3. +20
    -5
      internal/consensus/reactor_test.go
  4. +5
    -0
      internal/p2p/router.go
  5. +6
    -17
      node/setup.go

+ 4
- 1
internal/consensus/byzantine_test.go View File

@ -256,10 +256,13 @@ func TestByzantinePrevoteEquivocation(t *testing.T) {
}
msg, err := s.Next(ctx)
if !assert.NoError(t, err) {
assert.NoError(t, err)
if err != nil {
cancel()
return
}
require.NotNil(t, msg)
block := msg.Data().(types.EventDataNewBlock).Block
if len(block.Evidence.Evidence) != 0 {


+ 29
- 12
internal/consensus/reactor.go View File

@ -28,9 +28,9 @@ var (
// GetChannelDescriptor produces an instance of a descriptor for this
// package's required channels.
func GetChannelDescriptors() []*p2p.ChannelDescriptor {
return []*p2p.ChannelDescriptor{
{
func getChannelDescriptors() map[p2p.ChannelID]*p2p.ChannelDescriptor {
return map[p2p.ChannelID]*p2p.ChannelDescriptor{
StateChannel: {
ID: StateChannel,
MessageType: new(tmcons.Message),
Priority: 8,
@ -38,7 +38,7 @@ func GetChannelDescriptors() []*p2p.ChannelDescriptor {
RecvMessageCapacity: maxMsgSize,
RecvBufferCapacity: 128,
},
{
DataChannel: {
// TODO: Consider a split between gossiping current block and catchup
// stuff. Once we gossip the whole block there is nothing left to send
// until next height or round.
@ -49,7 +49,7 @@ func GetChannelDescriptors() []*p2p.ChannelDescriptor {
RecvBufferCapacity: 512,
RecvMessageCapacity: maxMsgSize,
},
{
VoteChannel: {
ID: VoteChannel,
MessageType: new(tmcons.Message),
Priority: 10,
@ -57,7 +57,7 @@ func GetChannelDescriptors() []*p2p.ChannelDescriptor {
RecvBufferCapacity: 128,
RecvMessageCapacity: maxMsgSize,
},
{
VoteSetBitsChannel: {
ID: VoteSetBitsChannel,
MessageType: new(tmcons.Message),
Priority: 5,
@ -131,17 +131,34 @@ type Reactor struct {
// to relevant p2p Channels and a channel to listen for peer updates on. The
// reactor will close all p2p Channels when stopping.
func NewReactor(
ctx context.Context,
logger log.Logger,
cs *State,
stateCh *p2p.Channel,
dataCh *p2p.Channel,
voteCh *p2p.Channel,
voteSetBitsCh *p2p.Channel,
channelCreator p2p.ChannelCreator,
peerUpdates *p2p.PeerUpdates,
waitSync bool,
metrics *Metrics,
) *Reactor {
) (*Reactor, error) {
chans := getChannelDescriptors()
stateCh, err := channelCreator(ctx, chans[StateChannel])
if err != nil {
return nil, err
}
dataCh, err := channelCreator(ctx, chans[DataChannel])
if err != nil {
return nil, err
}
voteCh, err := channelCreator(ctx, chans[VoteChannel])
if err != nil {
return nil, err
}
voteSetBitsCh, err := channelCreator(ctx, chans[VoteSetBitsChannel])
if err != nil {
return nil, err
}
r := &Reactor{
logger: logger,
state: cs,
@ -156,7 +173,7 @@ func NewReactor(
}
r.BaseService = *service.NewBaseService(logger, "Consensus", r)
return r
return r, nil
}
// OnStart starts separate go routines for each p2p Channel and listens for


+ 20
- 5
internal/consensus/reactor_test.go View File

@ -83,21 +83,36 @@ func setup(
ctx, cancel := context.WithCancel(ctx)
// Canceled during cleanup (see below).
chCreator := func(nodeID types.NodeID) p2p.ChannelCreator {
return func(ctx context.Context, desc *p2p.ChannelDescriptor) (*p2p.Channel, error) {
switch desc.ID {
case StateChannel:
return rts.stateChannels[nodeID], nil
case DataChannel:
return rts.dataChannels[nodeID], nil
case VoteChannel:
return rts.voteChannels[nodeID], nil
case VoteSetBitsChannel:
return rts.voteSetBitsChannels[nodeID], nil
default:
return nil, fmt.Errorf("invalid channel; %v", desc.ID)
}
}
}
i := 0
for nodeID, node := range rts.network.Nodes {
state := states[i]
reactor := NewReactor(
reactor, err := NewReactor(ctx,
state.logger.With("node", nodeID),
state,
rts.stateChannels[nodeID],
rts.dataChannels[nodeID],
rts.voteChannels[nodeID],
rts.voteSetBitsChannels[nodeID],
chCreator(nodeID),
node.MakePeerUpdates(ctx, t),
true,
NopMetrics(),
)
require.NoError(t, err)
reactor.SetEventBus(state.eventBus)


+ 5
- 0
internal/p2p/router.go View File

@ -254,6 +254,11 @@ func (r *Router) createQueueFactory(ctx context.Context) (func(int) queue, error
}
}
// ChannelCreator allows routers to construct their own channels,
// either by receiving a reference to Router.OpenChannel or using some
// kind shim for testing purposes.
type ChannelCreator func(context.Context, *ChannelDescriptor) (*Channel, error)
// OpenChannel opens a new channel for the given message type. The caller must
// close the channel when done, before stopping the Router. messageType is the
// type of message passed through the channel (used for unmarshaling), which can


+ 6
- 17
node/setup.go View File

@ -313,29 +313,18 @@ func createConsensusReactor(
consensusState.SetPrivValidator(ctx, privValidator)
}
csChDesc := consensus.GetChannelDescriptors()
channels := make(map[p2p.ChannelID]*p2p.Channel, len(csChDesc))
for idx := range csChDesc {
chd := csChDesc[idx]
ch, err := router.OpenChannel(ctx, chd)
if err != nil {
return nil, nil, err
}
channels[ch.ID] = ch
}
reactor := consensus.NewReactor(
reactor, err := consensus.NewReactor(
ctx,
logger,
consensusState,
channels[consensus.StateChannel],
channels[consensus.DataChannel],
channels[consensus.VoteChannel],
channels[consensus.VoteSetBitsChannel],
router.OpenChannel,
peerManager.Subscribe(ctx),
waitSync,
csMetrics,
)
if err != nil {
return nil, nil, err
}
// Services which will be publishing and/or subscribing for messages (events)
// consensusReactor will set it on consensusState and blockExecutor.


Loading…
Cancel
Save