Browse Source

p2p: add Channels to NodeInfo and don't send for unknown channels

pull/1049/head
Ethan Buchman 7 years ago
parent
commit
50129ad8ac
3 changed files with 92 additions and 22 deletions
  1. +25
    -15
      node/node.go
  2. +42
    -5
      p2p/node_info.go
  3. +25
    -2
      p2p/peer.go

+ 25
- 15
node/node.go View File

@ -18,7 +18,7 @@ import (
bc "github.com/tendermint/tendermint/blockchain" bc "github.com/tendermint/tendermint/blockchain"
cfg "github.com/tendermint/tendermint/config" cfg "github.com/tendermint/tendermint/config"
"github.com/tendermint/tendermint/consensus"
cs "github.com/tendermint/tendermint/consensus"
"github.com/tendermint/tendermint/evidence" "github.com/tendermint/tendermint/evidence"
mempl "github.com/tendermint/tendermint/mempool" mempl "github.com/tendermint/tendermint/mempool"
"github.com/tendermint/tendermint/p2p" "github.com/tendermint/tendermint/p2p"
@ -104,14 +104,14 @@ type Node struct {
// services // services
eventBus *types.EventBus // pub/sub for services eventBus *types.EventBus // pub/sub for services
stateDB dbm.DB stateDB dbm.DB
blockStore *bc.BlockStore // store the blockchain to disk
bcReactor *bc.BlockchainReactor // for fast-syncing
mempoolReactor *mempl.MempoolReactor // for gossipping transactions
consensusState *consensus.ConsensusState // latest consensus state
consensusReactor *consensus.ConsensusReactor // for participating in the consensus
evidencePool *evidence.EvidencePool // tracking evidence
proxyApp proxy.AppConns // connection to the application
rpcListeners []net.Listener // rpc servers
blockStore *bc.BlockStore // store the blockchain to disk
bcReactor *bc.BlockchainReactor // for fast-syncing
mempoolReactor *mempl.MempoolReactor // for gossipping transactions
consensusState *cs.ConsensusState // latest consensus state
consensusReactor *cs.ConsensusReactor // for participating in the consensus
evidencePool *evidence.EvidencePool // tracking evidence
proxyApp proxy.AppConns // connection to the application
rpcListeners []net.Listener // rpc servers
txIndexer txindex.TxIndexer txIndexer txindex.TxIndexer
indexerService *txindex.IndexerService indexerService *txindex.IndexerService
} }
@ -159,7 +159,7 @@ func NewNode(config *cfg.Config,
// and sync tendermint and the app by performing a handshake // and sync tendermint and the app by performing a handshake
// and replaying any necessary blocks // and replaying any necessary blocks
consensusLogger := logger.With("module", "consensus") consensusLogger := logger.With("module", "consensus")
handshaker := consensus.NewHandshaker(stateDB, state, blockStore)
handshaker := cs.NewHandshaker(stateDB, state, blockStore)
handshaker.SetLogger(consensusLogger) handshaker.SetLogger(consensusLogger)
proxyApp := proxy.NewAppConns(clientCreator, handshaker) proxyApp := proxy.NewAppConns(clientCreator, handshaker)
proxyApp.SetLogger(logger.With("module", "proxy")) proxyApp.SetLogger(logger.With("module", "proxy"))
@ -220,13 +220,13 @@ func NewNode(config *cfg.Config,
bcReactor.SetLogger(logger.With("module", "blockchain")) bcReactor.SetLogger(logger.With("module", "blockchain"))
// Make ConsensusReactor // Make ConsensusReactor
consensusState := consensus.NewConsensusState(config.Consensus, state.Copy(),
consensusState := cs.NewConsensusState(config.Consensus, state.Copy(),
blockExec, blockStore, mempool, evidencePool) blockExec, blockStore, mempool, evidencePool)
consensusState.SetLogger(consensusLogger) consensusState.SetLogger(consensusLogger)
if privValidator != nil { if privValidator != nil {
consensusState.SetPrivValidator(privValidator) consensusState.SetPrivValidator(privValidator)
} }
consensusReactor := consensus.NewConsensusReactor(consensusState, fastSync)
consensusReactor := cs.NewConsensusReactor(consensusState, fastSync)
consensusReactor.SetLogger(consensusLogger) consensusReactor.SetLogger(consensusLogger)
p2pLogger := logger.With("module", "p2p") p2pLogger := logger.With("module", "p2p")
@ -503,12 +503,12 @@ func (n *Node) BlockStore() *bc.BlockStore {
} }
// ConsensusState returns the Node's ConsensusState. // ConsensusState returns the Node's ConsensusState.
func (n *Node) ConsensusState() *consensus.ConsensusState {
func (n *Node) ConsensusState() *cs.ConsensusState {
return n.consensusState return n.consensusState
} }
// ConsensusReactor returns the Node's ConsensusReactor. // ConsensusReactor returns the Node's ConsensusReactor.
func (n *Node) ConsensusReactor() *consensus.ConsensusReactor {
func (n *Node) ConsensusReactor() *cs.ConsensusReactor {
return n.consensusReactor return n.consensusReactor
} }
@ -552,16 +552,26 @@ func (n *Node) makeNodeInfo(pubKey crypto.PubKey) p2p.NodeInfo {
PubKey: pubKey, PubKey: pubKey,
Network: n.genesisDoc.ChainID, Network: n.genesisDoc.ChainID,
Version: version.Version, Version: version.Version,
Channels: []byte{
bc.BlockchainChannel,
cs.StateChannel, cs.DataChannel, cs.VoteChannel, cs.VoteSetBitsChannel,
mempl.MempoolChannel,
evidence.EvidenceChannel,
},
Moniker: n.config.Moniker, Moniker: n.config.Moniker,
Other: []string{ Other: []string{
cmn.Fmt("wire_version=%v", wire.Version), cmn.Fmt("wire_version=%v", wire.Version),
cmn.Fmt("p2p_version=%v", p2p.Version), cmn.Fmt("p2p_version=%v", p2p.Version),
cmn.Fmt("consensus_version=%v", consensus.Version),
cmn.Fmt("consensus_version=%v", cs.Version),
cmn.Fmt("rpc_version=%v/%v", rpc.Version, rpccore.Version), cmn.Fmt("rpc_version=%v/%v", rpc.Version, rpccore.Version),
cmn.Fmt("tx_index=%v", txIndexerStatus), cmn.Fmt("tx_index=%v", txIndexerStatus),
}, },
} }
if n.config.P2P.PexReactor {
nodeInfo.Channels = append(nodeInfo.Channels, pex.PexChannel)
}
rpcListenAddr := n.config.RPC.ListenAddress rpcListenAddr := n.config.RPC.ListenAddress
nodeInfo.Other = append(nodeInfo.Other, cmn.Fmt("rpc_addr=%v", rpcListenAddr)) nodeInfo.Other = append(nodeInfo.Other, cmn.Fmt("rpc_addr=%v", rpcListenAddr))


+ 42
- 5
p2p/node_info.go View File

@ -7,7 +7,10 @@ import (
crypto "github.com/tendermint/go-crypto" crypto "github.com/tendermint/go-crypto"
) )
const maxNodeInfoSize = 10240 // 10Kb
const (
maxNodeInfoSize = 10240 // 10Kb
maxNumChannels = 16 // plenty of room for upgrades, for now
)
func MaxNodeInfoSize() int { func MaxNodeInfoSize() int {
return maxNodeInfoSize return maxNodeInfoSize
@ -21,8 +24,9 @@ type NodeInfo struct {
ListenAddr string `json:"listen_addr"` // accepting incoming ListenAddr string `json:"listen_addr"` // accepting incoming
// Check compatibility // Check compatibility
Network string `json:"network"` // network/chain ID
Version string `json:"version"` // major.minor.revision
Network string `json:"network"` // network/chain ID
Version string `json:"version"` // major.minor.revision
Channels []byte `json:"channels"` // channels this node knows about
// Sanitize // Sanitize
Moniker string `json:"moniker"` // arbitrary moniker Moniker string `json:"moniker"` // arbitrary moniker
@ -30,18 +34,33 @@ type NodeInfo struct {
} }
// Validate checks the self-reported NodeInfo is safe. // Validate checks the self-reported NodeInfo is safe.
// It returns an error if the info.PubKey doesn't match the given pubKey.
// It returns an error if the info.PubKey doesn't match the given pubKey,
// or if there are too many Channels or any duplicate Channels.
// TODO: constraints for Moniker/Other? Or is that for the UI ? // TODO: constraints for Moniker/Other? Or is that for the UI ?
func (info NodeInfo) Validate(pubKey crypto.PubKey) error { func (info NodeInfo) Validate(pubKey crypto.PubKey) error {
if !info.PubKey.Equals(pubKey) { if !info.PubKey.Equals(pubKey) {
return fmt.Errorf("info.PubKey (%v) doesn't match peer.PubKey (%v)", return fmt.Errorf("info.PubKey (%v) doesn't match peer.PubKey (%v)",
info.PubKey, pubKey) info.PubKey, pubKey)
} }
if len(info.Channels) > maxNumChannels {
return fmt.Errorf("info.Channels is too long (%v). Max is %v", len(info.Channels), maxNumChannels)
}
channels := make(map[byte]struct{})
for _, ch := range info.Channels {
_, ok := channels[ch]
if ok {
return fmt.Errorf("info.Channels contains duplicate channel id %v", ch)
}
channels[ch] = struct{}{}
}
return nil return nil
} }
// CompatibleWith checks if two NodeInfo are compatible with eachother. // CompatibleWith checks if two NodeInfo are compatible with eachother.
// CONTRACT: two nodes are compatible if the major/minor versions match and network match.
// CONTRACT: two nodes are compatible if the major/minor versions match and network match
// and they have at least one channel in common.
func (info NodeInfo) CompatibleWith(other NodeInfo) error { func (info NodeInfo) CompatibleWith(other NodeInfo) error {
iMajor, iMinor, _, iErr := splitVersion(info.Version) iMajor, iMinor, _, iErr := splitVersion(info.Version)
oMajor, oMinor, _, oErr := splitVersion(other.Version) oMajor, oMinor, _, oErr := splitVersion(other.Version)
@ -71,6 +90,24 @@ func (info NodeInfo) CompatibleWith(other NodeInfo) error {
return fmt.Errorf("Peer is on a different network. Got %v, expected %v", other.Network, info.Network) return fmt.Errorf("Peer is on a different network. Got %v, expected %v", other.Network, info.Network)
} }
// if we have no channels, we're just testing
if len(info.Channels) == 0 {
return nil
}
// for each of our channels, check if they have it
found := false
for _, ch1 := range info.Channels {
for _, ch2 := range other.Channels {
if ch1 == ch2 {
found = true
break // only need one
}
}
}
if !found {
return fmt.Errorf("Peer has no common channels. Our channels: %v ; Peer channels: %v", info.Channels, other.Channels)
}
return nil return nil
} }


+ 25
- 2
p2p/peer.go View File

@ -48,7 +48,8 @@ type peer struct {
persistent bool persistent bool
config *PeerConfig config *PeerConfig
nodeInfo NodeInfo
nodeInfo NodeInfo // peer's node info
channels []byte // channels the peer knows about
Data *cmn.CMap // User data. Data *cmn.CMap // User data.
} }
@ -204,6 +205,8 @@ func (p *peer) Send(chID byte, msg interface{}) bool {
// see Switch#Broadcast, where we fetch the list of peers and loop over // see Switch#Broadcast, where we fetch the list of peers and loop over
// them - while we're looping, one peer may be removed and stopped. // them - while we're looping, one peer may be removed and stopped.
return false return false
} else if !p.hasChannel(chID) {
return false
} }
return p.mconn.Send(chID, msg) return p.mconn.Send(chID, msg)
} }
@ -213,6 +216,8 @@ func (p *peer) Send(chID byte, msg interface{}) bool {
func (p *peer) TrySend(chID byte, msg interface{}) bool { func (p *peer) TrySend(chID byte, msg interface{}) bool {
if !p.IsRunning() { if !p.IsRunning() {
return false return false
} else if !p.hasChannel(chID) {
return false
} }
return p.mconn.TrySend(chID, msg) return p.mconn.TrySend(chID, msg)
} }
@ -227,6 +232,17 @@ func (p *peer) Set(key string, data interface{}) {
p.Data.Set(key, data) p.Data.Set(key, data)
} }
// hasChannel returns true if the peer reported
// knowing about the given chID.
func (p *peer) hasChannel(chID byte) bool {
for _, ch := range p.channels {
if ch == chID {
return true
}
}
return false
}
//--------------------------------------------------- //---------------------------------------------------
// methods used by the Switch // methods used by the Switch
@ -269,10 +285,17 @@ func (p *peer) HandshakeTimeout(ourNodeInfo NodeInfo, timeout time.Duration) err
return errors.Wrap(err, "Error removing deadline") return errors.Wrap(err, "Error removing deadline")
} }
p.nodeInfo = peerNodeInfo
p.setNodeInfo(peerNodeInfo)
return nil return nil
} }
func (p *peer) setNodeInfo(nodeInfo NodeInfo) {
p.nodeInfo = nodeInfo
// cache the channels so we dont copy nodeInfo
// every time we check hasChannel
p.channels = nodeInfo.Channels
}
// Addr returns peer's remote network address. // Addr returns peer's remote network address.
func (p *peer) Addr() net.Addr { func (p *peer) Addr() net.Addr {
return p.conn.RemoteAddr() return p.conn.RemoteAddr()


Loading…
Cancel
Save