Browse Source

p2p: fix using custom channels (#6339)

pull/6351/head
Callum Waters 4 years ago
committed by GitHub
parent
commit
a9ac63510d
No known key found for this signature in database GPG Key ID: 4AEE18F83AFDEB23
9 changed files with 75 additions and 14 deletions
  1. +2
    -0
      CHANGELOG_PENDING.md
  2. +17
    -2
      node/node.go
  3. +13
    -0
      node/node_test.go
  4. +3
    -1
      p2p/mock/reactor.go
  5. +9
    -4
      p2p/node_info.go
  6. +2
    -1
      p2p/node_info_test.go
  7. +1
    -6
      p2p/peer.go
  8. +13
    -0
      p2p/transport.go
  9. +15
    -0
      p2p/transport_test.go

+ 2
- 0
CHANGELOG_PENDING.md View File

@ -24,3 +24,5 @@ Friendly reminder, we have a [bug bounty program](https://hackerone.com/tendermi
### BUG FIXES
- [p2p/node] \#6339 Fix bug with using custom channels (@cmwaters)

+ 17
- 2
node/node.go View File

@ -152,6 +152,21 @@ func CustomReactors(reactors map[string]p2p.Reactor) Option {
n.sw.RemoveReactor(name, existingReactor)
}
n.sw.AddReactor(name, reactor)
// register the new channels to the nodeInfo
// NOTE: This is a bit messy now with the type casting but is
// cleaned up in the following version when NodeInfo is changed from
// and interface to a concrete type
if ni, ok := n.nodeInfo.(p2p.DefaultNodeInfo); ok {
for _, chDesc := range reactor.GetChannels() {
if !ni.HasChannel(chDesc.ID) {
ni.Channels = append(ni.Channels, chDesc.ID)
n.transport.AddChannel(chDesc.ID)
}
}
n.nodeInfo = ni
} else {
n.Logger.Error("Node info is not of type DefaultNodeInfo. Custom reactor channels can not be added.")
}
}
}
}
@ -1240,7 +1255,7 @@ func makeNodeInfo(
txIndexer txindex.TxIndexer,
genDoc *types.GenesisDoc,
state sm.State,
) (p2p.NodeInfo, error) {
) (p2p.DefaultNodeInfo, error) {
txIndexerStatus := "on"
if _, ok := txIndexer.(*null.TxIndex); ok {
txIndexerStatus = "off"
@ -1255,7 +1270,7 @@ func makeNodeInfo(
case "v2":
bcChannel = bcv2.BlockchainChannel
default:
return nil, fmt.Errorf("unknown fastsync version %s", config.FastSync.Version)
return p2p.DefaultNodeInfo{}, fmt.Errorf("unknown fastsync version %s", config.FastSync.Version)
}
nodeInfo := p2p.DefaultNodeInfo{


+ 13
- 0
node/node_test.go View File

@ -22,6 +22,7 @@ import (
tmrand "github.com/tendermint/tendermint/libs/rand"
mempl "github.com/tendermint/tendermint/mempool"
"github.com/tendermint/tendermint/p2p"
"github.com/tendermint/tendermint/p2p/conn"
p2pmock "github.com/tendermint/tendermint/p2p/mock"
"github.com/tendermint/tendermint/privval"
"github.com/tendermint/tendermint/proxy"
@ -379,6 +380,14 @@ func TestNodeNewNodeCustomReactors(t *testing.T) {
defer os.RemoveAll(config.RootDir)
cr := p2pmock.NewReactor()
cr.Channels = []*conn.ChannelDescriptor{
{
ID: byte(0x31),
Priority: 5,
SendQueueCapacity: 100,
RecvMessageCapacity: 100,
},
}
customBlockchainReactor := p2pmock.NewReactor()
nodeKey, err := p2p.LoadOrGenNodeKey(config.NodeKeyFile())
@ -405,6 +414,10 @@ func TestNodeNewNodeCustomReactors(t *testing.T) {
assert.True(t, customBlockchainReactor.IsRunning())
assert.Equal(t, customBlockchainReactor, n.Switch().Reactor("BLOCKCHAIN"))
channels := n.NodeInfo().(p2p.DefaultNodeInfo).Channels
assert.Contains(t, channels, mempl.MempoolChannel)
assert.Contains(t, channels, cr.Channels[0].ID)
}
func state(nVals int, height int64) (sm.State, dbm.DB, []types.PrivValidator) {


+ 3
- 1
p2p/mock/reactor.go View File

@ -8,6 +8,8 @@ import (
type Reactor struct {
p2p.BaseReactor
Channels []*conn.ChannelDescriptor
}
func NewReactor() *Reactor {
@ -17,7 +19,7 @@ func NewReactor() *Reactor {
return r
}
func (r *Reactor) GetChannels() []*conn.ChannelDescriptor { return []*conn.ChannelDescriptor{} }
func (r *Reactor) GetChannels() []*conn.ChannelDescriptor { return r.Channels }
func (r *Reactor) AddPeer(peer p2p.Peer) {}
func (r *Reactor) RemovePeer(peer p2p.Peer, reason interface{}) {}
func (r *Reactor) Receive(chID byte, peer p2p.Peer, msgBytes []byte) {}

+ 9
- 4
p2p/node_info.go View File

@ -1,11 +1,12 @@
package p2p
import (
"bytes"
"errors"
"fmt"
"reflect"
"github.com/tendermint/tendermint/libs/bytes"
tmbytes "github.com/tendermint/tendermint/libs/bytes"
tmstrings "github.com/tendermint/tendermint/libs/strings"
tmp2p "github.com/tendermint/tendermint/proto/tendermint/p2p"
"github.com/tendermint/tendermint/version"
@ -85,9 +86,9 @@ type DefaultNodeInfo struct {
// Check compatibility.
// Channels are HexBytes so easier to read as JSON
Network string `json:"network"` // network/chain ID
Version string `json:"version"` // major.minor.revision
Channels bytes.HexBytes `json:"channels"` // channels this node knows about
Network string `json:"network"` // network/chain ID
Version string `json:"version"` // major.minor.revision
Channels tmbytes.HexBytes `json:"channels"` // channels this node knows about
// ASCIIText fields
Moniker string `json:"moniker"` // arbitrary moniker
@ -222,6 +223,10 @@ func (info DefaultNodeInfo) NetAddress() (*NetAddress, error) {
return NewNetAddressString(idAddr)
}
func (info DefaultNodeInfo) HasChannel(chID byte) bool {
return bytes.Contains(info.Channels, []byte{chID})
}
func (info DefaultNodeInfo) ToProto() *tmp2p.DefaultNodeInfo {
dni := new(tmp2p.DefaultNodeInfo)


+ 2
- 1
p2p/node_info_test.go View File

@ -102,7 +102,8 @@ func TestNodeInfoCompatible(t *testing.T) {
assert.NoError(t, ni1.CompatibleWith(ni2))
// add another channel; still compatible
ni2.Channels = []byte{newTestChannel, testCh}
ni2.Channels = append(ni2.Channels, newTestChannel)
assert.True(t, ni2.HasChannel(newTestChannel))
assert.NoError(t, ni1.CompatibleWith(ni2))
// wrong NodeInfo type is not compatible


+ 1
- 6
p2p/peer.go View File

@ -130,15 +130,10 @@ func newPeer(
onPeerError func(Peer, interface{}),
options ...PeerOption,
) *peer {
var channs = make([]byte, 0, len(chDescs))
for _, desc := range chDescs {
channs = append(channs, desc.ID)
}
p := &peer{
peerConn: pc,
nodeInfo: nodeInfo,
channels: channs,
channels: nodeInfo.(DefaultNodeInfo).Channels,
Data: cmap.NewCMap(),
metricsTicker: time.NewTicker(metricsTickerDuration),
metrics: NopMetrics(),


+ 13
- 0
p2p/transport.go View File

@ -261,6 +261,19 @@ func (mt *MultiplexTransport) Listen(addr NetAddress) error {
return nil
}
// AddChannel registers a channel to nodeInfo.
// NOTE: NodeInfo must be of type DefaultNodeInfo else channels won't be updated
// This is a bit messy at the moment but is cleaned up in the following version
// when NodeInfo changes from an interface to a concrete type
func (mt *MultiplexTransport) AddChannel(chID byte) {
if ni, ok := mt.nodeInfo.(DefaultNodeInfo); ok {
if !ni.HasChannel(chID) {
ni.Channels = append(ni.Channels, chID)
}
mt.nodeInfo = ni
}
}
func (mt *MultiplexTransport) acceptPeers() {
for {
c, err := mt.listener.Accept()


+ 15
- 0
p2p/transport_test.go View File

@ -621,6 +621,21 @@ func TestTransportHandshake(t *testing.T) {
}
}
func TestTransportAddChannel(t *testing.T) {
mt := newMultiplexTransport(
emptyNodeInfo(),
NodeKey{
PrivKey: ed25519.GenPrivKey(),
},
)
testChannel := byte(0x01)
mt.AddChannel(testChannel)
if !mt.nodeInfo.(DefaultNodeInfo).HasChannel(testChannel) {
t.Errorf("missing added channel %v. Got %v", testChannel, mt.nodeInfo.(DefaultNodeInfo).Channels)
}
}
// create listener
func testSetupMultiplexTransport(t *testing.T) *MultiplexTransport {
var (


Loading…
Cancel
Save