diff --git a/CHANGELOG_PENDING.md b/CHANGELOG_PENDING.md index 5cff3e98f..0b1882c69 100644 --- a/CHANGELOG_PENDING.md +++ b/CHANGELOG_PENDING.md @@ -24,3 +24,5 @@ Friendly reminder, we have a [bug bounty program](https://hackerone.com/tendermi ### BUG FIXES +- [p2p/node] \#6339 Fix bug with using custom channels (@cmwaters) + diff --git a/node/node.go b/node/node.go index 92011ad80..738d3a589 100644 --- a/node/node.go +++ b/node/node.go @@ -152,6 +152,21 @@ func CustomReactors(reactors map[string]p2p.Reactor) Option { n.sw.RemoveReactor(name, existingReactor) } n.sw.AddReactor(name, reactor) + // register the new channels to the nodeInfo + // NOTE: This is a bit messy now with the type casting but is + // cleaned up in the following version when NodeInfo is changed from + // and interface to a concrete type + if ni, ok := n.nodeInfo.(p2p.DefaultNodeInfo); ok { + for _, chDesc := range reactor.GetChannels() { + if !ni.HasChannel(chDesc.ID) { + ni.Channels = append(ni.Channels, chDesc.ID) + n.transport.AddChannel(chDesc.ID) + } + } + n.nodeInfo = ni + } else { + n.Logger.Error("Node info is not of type DefaultNodeInfo. Custom reactor channels can not be added.") + } } } } @@ -1240,7 +1255,7 @@ func makeNodeInfo( txIndexer txindex.TxIndexer, genDoc *types.GenesisDoc, state sm.State, -) (p2p.NodeInfo, error) { +) (p2p.DefaultNodeInfo, error) { txIndexerStatus := "on" if _, ok := txIndexer.(*null.TxIndex); ok { txIndexerStatus = "off" @@ -1255,7 +1270,7 @@ func makeNodeInfo( case "v2": bcChannel = bcv2.BlockchainChannel default: - return nil, fmt.Errorf("unknown fastsync version %s", config.FastSync.Version) + return p2p.DefaultNodeInfo{}, fmt.Errorf("unknown fastsync version %s", config.FastSync.Version) } nodeInfo := p2p.DefaultNodeInfo{ diff --git a/node/node_test.go b/node/node_test.go index 8cbf38601..d92edcc6d 100644 --- a/node/node_test.go +++ b/node/node_test.go @@ -22,6 +22,7 @@ import ( tmrand "github.com/tendermint/tendermint/libs/rand" mempl "github.com/tendermint/tendermint/mempool" "github.com/tendermint/tendermint/p2p" + "github.com/tendermint/tendermint/p2p/conn" p2pmock "github.com/tendermint/tendermint/p2p/mock" "github.com/tendermint/tendermint/privval" "github.com/tendermint/tendermint/proxy" @@ -379,6 +380,14 @@ func TestNodeNewNodeCustomReactors(t *testing.T) { defer os.RemoveAll(config.RootDir) cr := p2pmock.NewReactor() + cr.Channels = []*conn.ChannelDescriptor{ + { + ID: byte(0x31), + Priority: 5, + SendQueueCapacity: 100, + RecvMessageCapacity: 100, + }, + } customBlockchainReactor := p2pmock.NewReactor() nodeKey, err := p2p.LoadOrGenNodeKey(config.NodeKeyFile()) @@ -405,6 +414,10 @@ func TestNodeNewNodeCustomReactors(t *testing.T) { assert.True(t, customBlockchainReactor.IsRunning()) assert.Equal(t, customBlockchainReactor, n.Switch().Reactor("BLOCKCHAIN")) + + channels := n.NodeInfo().(p2p.DefaultNodeInfo).Channels + assert.Contains(t, channels, mempl.MempoolChannel) + assert.Contains(t, channels, cr.Channels[0].ID) } func state(nVals int, height int64) (sm.State, dbm.DB, []types.PrivValidator) { diff --git a/p2p/mock/reactor.go b/p2p/mock/reactor.go index 40f918e9f..0389a7d19 100644 --- a/p2p/mock/reactor.go +++ b/p2p/mock/reactor.go @@ -8,6 +8,8 @@ import ( type Reactor struct { p2p.BaseReactor + + Channels []*conn.ChannelDescriptor } func NewReactor() *Reactor { @@ -17,7 +19,7 @@ func NewReactor() *Reactor { return r } -func (r *Reactor) GetChannels() []*conn.ChannelDescriptor { return []*conn.ChannelDescriptor{} } +func (r *Reactor) GetChannels() []*conn.ChannelDescriptor { return r.Channels } func (r *Reactor) AddPeer(peer p2p.Peer) {} func (r *Reactor) RemovePeer(peer p2p.Peer, reason interface{}) {} func (r *Reactor) Receive(chID byte, peer p2p.Peer, msgBytes []byte) {} diff --git a/p2p/node_info.go b/p2p/node_info.go index 8acc23676..6b0046804 100644 --- a/p2p/node_info.go +++ b/p2p/node_info.go @@ -1,11 +1,12 @@ package p2p import ( + "bytes" "errors" "fmt" "reflect" - "github.com/tendermint/tendermint/libs/bytes" + tmbytes "github.com/tendermint/tendermint/libs/bytes" tmstrings "github.com/tendermint/tendermint/libs/strings" tmp2p "github.com/tendermint/tendermint/proto/tendermint/p2p" "github.com/tendermint/tendermint/version" @@ -85,9 +86,9 @@ type DefaultNodeInfo struct { // Check compatibility. // Channels are HexBytes so easier to read as JSON - Network string `json:"network"` // network/chain ID - Version string `json:"version"` // major.minor.revision - Channels bytes.HexBytes `json:"channels"` // channels this node knows about + Network string `json:"network"` // network/chain ID + Version string `json:"version"` // major.minor.revision + Channels tmbytes.HexBytes `json:"channels"` // channels this node knows about // ASCIIText fields Moniker string `json:"moniker"` // arbitrary moniker @@ -222,6 +223,10 @@ func (info DefaultNodeInfo) NetAddress() (*NetAddress, error) { return NewNetAddressString(idAddr) } +func (info DefaultNodeInfo) HasChannel(chID byte) bool { + return bytes.Contains(info.Channels, []byte{chID}) +} + func (info DefaultNodeInfo) ToProto() *tmp2p.DefaultNodeInfo { dni := new(tmp2p.DefaultNodeInfo) diff --git a/p2p/node_info_test.go b/p2p/node_info_test.go index c34e71230..1bceb4a10 100644 --- a/p2p/node_info_test.go +++ b/p2p/node_info_test.go @@ -102,7 +102,8 @@ func TestNodeInfoCompatible(t *testing.T) { assert.NoError(t, ni1.CompatibleWith(ni2)) // add another channel; still compatible - ni2.Channels = []byte{newTestChannel, testCh} + ni2.Channels = append(ni2.Channels, newTestChannel) + assert.True(t, ni2.HasChannel(newTestChannel)) assert.NoError(t, ni1.CompatibleWith(ni2)) // wrong NodeInfo type is not compatible diff --git a/p2p/peer.go b/p2p/peer.go index c7a0b3525..21ebac678 100644 --- a/p2p/peer.go +++ b/p2p/peer.go @@ -130,15 +130,10 @@ func newPeer( onPeerError func(Peer, interface{}), options ...PeerOption, ) *peer { - var channs = make([]byte, 0, len(chDescs)) - for _, desc := range chDescs { - channs = append(channs, desc.ID) - } - p := &peer{ peerConn: pc, nodeInfo: nodeInfo, - channels: channs, + channels: nodeInfo.(DefaultNodeInfo).Channels, Data: cmap.NewCMap(), metricsTicker: time.NewTicker(metricsTickerDuration), metrics: NopMetrics(), diff --git a/p2p/transport.go b/p2p/transport.go index 8448888cb..1257f38b3 100644 --- a/p2p/transport.go +++ b/p2p/transport.go @@ -261,6 +261,19 @@ func (mt *MultiplexTransport) Listen(addr NetAddress) error { return nil } +// AddChannel registers a channel to nodeInfo. +// NOTE: NodeInfo must be of type DefaultNodeInfo else channels won't be updated +// This is a bit messy at the moment but is cleaned up in the following version +// when NodeInfo changes from an interface to a concrete type +func (mt *MultiplexTransport) AddChannel(chID byte) { + if ni, ok := mt.nodeInfo.(DefaultNodeInfo); ok { + if !ni.HasChannel(chID) { + ni.Channels = append(ni.Channels, chID) + } + mt.nodeInfo = ni + } +} + func (mt *MultiplexTransport) acceptPeers() { for { c, err := mt.listener.Accept() diff --git a/p2p/transport_test.go b/p2p/transport_test.go index 77db5a3bd..7638de4cb 100644 --- a/p2p/transport_test.go +++ b/p2p/transport_test.go @@ -621,6 +621,21 @@ func TestTransportHandshake(t *testing.T) { } } +func TestTransportAddChannel(t *testing.T) { + mt := newMultiplexTransport( + emptyNodeInfo(), + NodeKey{ + PrivKey: ed25519.GenPrivKey(), + }, + ) + testChannel := byte(0x01) + + mt.AddChannel(testChannel) + if !mt.nodeInfo.(DefaultNodeInfo).HasChannel(testChannel) { + t.Errorf("missing added channel %v. Got %v", testChannel, mt.nodeInfo.(DefaultNodeInfo).Channels) + } +} + // create listener func testSetupMultiplexTransport(t *testing.T) *MultiplexTransport { var (