Browse Source

Merge pull request #1049 from tendermint/p2p-channels

p2p: add Channels to NodeInfo and don't send for unknown channels
pull/1154/head
Ethan Buchman 7 years ago
committed by GitHub
parent
commit
27ef3489a0
No known key found for this signature in database GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 119 additions and 37 deletions
  1. +25
    -15
      node/node.go
  2. +43
    -5
      p2p/node_info.go
  3. +28
    -2
      p2p/peer.go
  4. +12
    -8
      p2p/peer_test.go
  5. +11
    -7
      p2p/test_util.go

+ 25
- 15
node/node.go View File

@ -18,7 +18,7 @@ import (
bc "github.com/tendermint/tendermint/blockchain"
cfg "github.com/tendermint/tendermint/config"
"github.com/tendermint/tendermint/consensus"
cs "github.com/tendermint/tendermint/consensus"
"github.com/tendermint/tendermint/evidence"
mempl "github.com/tendermint/tendermint/mempool"
"github.com/tendermint/tendermint/p2p"
@ -104,14 +104,14 @@ type Node struct {
// services
eventBus *types.EventBus // pub/sub for services
stateDB dbm.DB
blockStore *bc.BlockStore // store the blockchain to disk
bcReactor *bc.BlockchainReactor // for fast-syncing
mempoolReactor *mempl.MempoolReactor // for gossipping transactions
consensusState *consensus.ConsensusState // latest consensus state
consensusReactor *consensus.ConsensusReactor // for participating in the consensus
evidencePool *evidence.EvidencePool // tracking evidence
proxyApp proxy.AppConns // connection to the application
rpcListeners []net.Listener // rpc servers
blockStore *bc.BlockStore // store the blockchain to disk
bcReactor *bc.BlockchainReactor // for fast-syncing
mempoolReactor *mempl.MempoolReactor // for gossipping transactions
consensusState *cs.ConsensusState // latest consensus state
consensusReactor *cs.ConsensusReactor // for participating in the consensus
evidencePool *evidence.EvidencePool // tracking evidence
proxyApp proxy.AppConns // connection to the application
rpcListeners []net.Listener // rpc servers
txIndexer txindex.TxIndexer
indexerService *txindex.IndexerService
}
@ -159,7 +159,7 @@ func NewNode(config *cfg.Config,
// and sync tendermint and the app by performing a handshake
// and replaying any necessary blocks
consensusLogger := logger.With("module", "consensus")
handshaker := consensus.NewHandshaker(stateDB, state, blockStore)
handshaker := cs.NewHandshaker(stateDB, state, blockStore)
handshaker.SetLogger(consensusLogger)
proxyApp := proxy.NewAppConns(clientCreator, handshaker)
proxyApp.SetLogger(logger.With("module", "proxy"))
@ -220,13 +220,13 @@ func NewNode(config *cfg.Config,
bcReactor.SetLogger(logger.With("module", "blockchain"))
// Make ConsensusReactor
consensusState := consensus.NewConsensusState(config.Consensus, state.Copy(),
consensusState := cs.NewConsensusState(config.Consensus, state.Copy(),
blockExec, blockStore, mempool, evidencePool)
consensusState.SetLogger(consensusLogger)
if privValidator != nil {
consensusState.SetPrivValidator(privValidator)
}
consensusReactor := consensus.NewConsensusReactor(consensusState, fastSync)
consensusReactor := cs.NewConsensusReactor(consensusState, fastSync)
consensusReactor.SetLogger(consensusLogger)
p2pLogger := logger.With("module", "p2p")
@ -503,12 +503,12 @@ func (n *Node) BlockStore() *bc.BlockStore {
}
// ConsensusState returns the Node's ConsensusState.
func (n *Node) ConsensusState() *consensus.ConsensusState {
func (n *Node) ConsensusState() *cs.ConsensusState {
return n.consensusState
}
// ConsensusReactor returns the Node's ConsensusReactor.
func (n *Node) ConsensusReactor() *consensus.ConsensusReactor {
func (n *Node) ConsensusReactor() *cs.ConsensusReactor {
return n.consensusReactor
}
@ -552,16 +552,26 @@ func (n *Node) makeNodeInfo(pubKey crypto.PubKey) p2p.NodeInfo {
PubKey: pubKey,
Network: n.genesisDoc.ChainID,
Version: version.Version,
Channels: []byte{
bc.BlockchainChannel,
cs.StateChannel, cs.DataChannel, cs.VoteChannel, cs.VoteSetBitsChannel,
mempl.MempoolChannel,
evidence.EvidenceChannel,
},
Moniker: n.config.Moniker,
Other: []string{
cmn.Fmt("wire_version=%v", wire.Version),
cmn.Fmt("p2p_version=%v", p2p.Version),
cmn.Fmt("consensus_version=%v", consensus.Version),
cmn.Fmt("consensus_version=%v", cs.Version),
cmn.Fmt("rpc_version=%v/%v", rpc.Version, rpccore.Version),
cmn.Fmt("tx_index=%v", txIndexerStatus),
},
}
if n.config.P2P.PexReactor {
nodeInfo.Channels = append(nodeInfo.Channels, pex.PexChannel)
}
rpcListenAddr := n.config.RPC.ListenAddress
nodeInfo.Other = append(nodeInfo.Other, cmn.Fmt("rpc_addr=%v", rpcListenAddr))


+ 43
- 5
p2p/node_info.go View File

@ -7,7 +7,10 @@ import (
crypto "github.com/tendermint/go-crypto"
)
const maxNodeInfoSize = 10240 // 10Kb
const (
maxNodeInfoSize = 10240 // 10Kb
maxNumChannels = 16 // plenty of room for upgrades, for now
)
func MaxNodeInfoSize() int {
return maxNodeInfoSize
@ -21,8 +24,9 @@ type NodeInfo struct {
ListenAddr string `json:"listen_addr"` // accepting incoming
// Check compatibility
Network string `json:"network"` // network/chain ID
Version string `json:"version"` // major.minor.revision
Network string `json:"network"` // network/chain ID
Version string `json:"version"` // major.minor.revision
Channels []byte `json:"channels"` // channels this node knows about
// Sanitize
Moniker string `json:"moniker"` // arbitrary moniker
@ -30,18 +34,33 @@ type NodeInfo struct {
}
// Validate checks the self-reported NodeInfo is safe.
// It returns an error if the info.PubKey doesn't match the given pubKey.
// It returns an error if the info.PubKey doesn't match the given pubKey,
// or if there are too many Channels or any duplicate Channels.
// TODO: constraints for Moniker/Other? Or is that for the UI ?
func (info NodeInfo) Validate(pubKey crypto.PubKey) error {
if !info.PubKey.Equals(pubKey) {
return fmt.Errorf("info.PubKey (%v) doesn't match peer.PubKey (%v)",
info.PubKey, pubKey)
}
if len(info.Channels) > maxNumChannels {
return fmt.Errorf("info.Channels is too long (%v). Max is %v", len(info.Channels), maxNumChannels)
}
channels := make(map[byte]struct{})
for _, ch := range info.Channels {
_, ok := channels[ch]
if ok {
return fmt.Errorf("info.Channels contains duplicate channel id %v", ch)
}
channels[ch] = struct{}{}
}
return nil
}
// CompatibleWith checks if two NodeInfo are compatible with eachother.
// CONTRACT: two nodes are compatible if the major/minor versions match and network match.
// CONTRACT: two nodes are compatible if the major/minor versions match and network match
// and they have at least one channel in common.
func (info NodeInfo) CompatibleWith(other NodeInfo) error {
iMajor, iMinor, _, iErr := splitVersion(info.Version)
oMajor, oMinor, _, oErr := splitVersion(other.Version)
@ -71,6 +90,25 @@ func (info NodeInfo) CompatibleWith(other NodeInfo) error {
return fmt.Errorf("Peer is on a different network. Got %v, expected %v", other.Network, info.Network)
}
// if we have no channels, we're just testing
if len(info.Channels) == 0 {
return nil
}
// for each of our channels, check if they have it
found := false
OUTER_LOOP:
for _, ch1 := range info.Channels {
for _, ch2 := range other.Channels {
if ch1 == ch2 {
found = true
break OUTER_LOOP // only need one
}
}
}
if !found {
return fmt.Errorf("Peer has no common channels. Our channels: %v ; Peer channels: %v", info.Channels, other.Channels)
}
return nil
}


+ 28
- 2
p2p/peer.go View File

@ -48,7 +48,8 @@ type peer struct {
persistent bool
config *PeerConfig
nodeInfo NodeInfo
nodeInfo NodeInfo // peer's node info
channels []byte // channels the peer knows about
Data *cmn.CMap // User data.
}
@ -204,6 +205,8 @@ func (p *peer) Send(chID byte, msg interface{}) bool {
// see Switch#Broadcast, where we fetch the list of peers and loop over
// them - while we're looping, one peer may be removed and stopped.
return false
} else if !p.hasChannel(chID) {
return false
}
return p.mconn.Send(chID, msg)
}
@ -213,6 +216,8 @@ func (p *peer) Send(chID byte, msg interface{}) bool {
func (p *peer) TrySend(chID byte, msg interface{}) bool {
if !p.IsRunning() {
return false
} else if !p.hasChannel(chID) {
return false
}
return p.mconn.TrySend(chID, msg)
}
@ -227,6 +232,20 @@ func (p *peer) Set(key string, data interface{}) {
p.Data.Set(key, data)
}
// hasChannel returns true if the peer reported
// knowing about the given chID.
func (p *peer) hasChannel(chID byte) bool {
for _, ch := range p.channels {
if ch == chID {
return true
}
}
// NOTE: probably will want to remove this
// but could be helpful while the feature is new
p.Logger.Debug("Unknown channel for peer", "channel", chID, "channels", p.channels)
return false
}
//---------------------------------------------------
// methods used by the Switch
@ -269,10 +288,17 @@ func (p *peer) HandshakeTimeout(ourNodeInfo NodeInfo, timeout time.Duration) err
return errors.Wrap(err, "Error removing deadline")
}
p.nodeInfo = peerNodeInfo
p.setNodeInfo(peerNodeInfo)
return nil
}
func (p *peer) setNodeInfo(nodeInfo NodeInfo) {
p.nodeInfo = nodeInfo
// cache the channels so we dont copy nodeInfo
// every time we check hasChannel
p.channels = nodeInfo.Channels
}
// Addr returns peer's remote network address.
func (p *peer) Addr() net.Addr {
return p.conn.RemoteAddr()


+ 12
- 8
p2p/peer_test.go View File

@ -13,6 +13,8 @@ import (
tmconn "github.com/tendermint/tendermint/p2p/conn"
)
const testCh = 0x01
func TestPeerBasic(t *testing.T) {
assert, require := assert.New(t), require.New(t)
@ -77,25 +79,26 @@ func TestPeerSend(t *testing.T) {
defer p.Stop()
assert.True(p.CanSend(0x01))
assert.True(p.Send(0x01, "Asylum"))
assert.True(p.CanSend(testCh))
assert.True(p.Send(testCh, "Asylum"))
}
func createOutboundPeerAndPerformHandshake(addr *NetAddress, config *PeerConfig) (*peer, error) {
chDescs := []*tmconn.ChannelDescriptor{
{ID: 0x01, Priority: 1},
{ID: testCh, Priority: 1},
}
reactorsByCh := map[byte]Reactor{0x01: NewTestReactor(chDescs, true)}
reactorsByCh := map[byte]Reactor{testCh: NewTestReactor(chDescs, true)}
pk := crypto.GenPrivKeyEd25519().Wrap()
p, err := newOutboundPeer(addr, reactorsByCh, chDescs, func(p Peer, r interface{}) {}, pk, config, false)
if err != nil {
return nil, err
}
err = p.HandshakeTimeout(NodeInfo{
PubKey: pk.PubKey(),
Moniker: "host_peer",
Network: "testing",
Version: "123.123.123",
PubKey: pk.PubKey(),
Moniker: "host_peer",
Network: "testing",
Version: "123.123.123",
Channels: []byte{testCh},
}, 1*time.Second)
if err != nil {
return nil, err
@ -148,6 +151,7 @@ func (p *remotePeer) accept(l net.Listener) {
Network: "testing",
Version: "123.123.123",
ListenAddr: l.Addr().String(),
Channels: []byte{testCh},
}, 1*time.Second)
if err != nil {
golog.Fatalf("Failed to perform handshake: %+v", err)


+ 11
- 7
p2p/test_util.go View File

@ -132,16 +132,20 @@ func MakeSwitch(cfg *cfg.P2PConfig, i int, network, version string, initSwitch f
nodeKey := &NodeKey{
PrivKey: crypto.GenPrivKeyEd25519().Wrap(),
}
s := NewSwitch(cfg)
s.SetLogger(log.TestingLogger())
s = initSwitch(i, s)
s.SetNodeInfo(NodeInfo{
sw := NewSwitch(cfg)
sw.SetLogger(log.TestingLogger())
sw = initSwitch(i, sw)
ni := NodeInfo{
PubKey: nodeKey.PubKey(),
Moniker: cmn.Fmt("switch%d", i),
Network: network,
Version: version,
ListenAddr: cmn.Fmt("%v:%v", network, rand.Intn(64512)+1023),
})
s.SetNodeKey(nodeKey)
return s
}
for ch, _ := range sw.reactorsByCh {
ni.Channels = append(ni.Channels, ch)
}
sw.SetNodeInfo(ni)
sw.SetNodeKey(nodeKey)
return sw
}

Loading…
Cancel
Save