diff --git a/node/node.go b/node/node.go index bdbf12f82..b02012f98 100644 --- a/node/node.go +++ b/node/node.go @@ -18,7 +18,7 @@ import ( bc "github.com/tendermint/tendermint/blockchain" cfg "github.com/tendermint/tendermint/config" - "github.com/tendermint/tendermint/consensus" + cs "github.com/tendermint/tendermint/consensus" "github.com/tendermint/tendermint/evidence" mempl "github.com/tendermint/tendermint/mempool" "github.com/tendermint/tendermint/p2p" @@ -104,14 +104,14 @@ type Node struct { // services eventBus *types.EventBus // pub/sub for services stateDB dbm.DB - blockStore *bc.BlockStore // store the blockchain to disk - bcReactor *bc.BlockchainReactor // for fast-syncing - mempoolReactor *mempl.MempoolReactor // for gossipping transactions - consensusState *consensus.ConsensusState // latest consensus state - consensusReactor *consensus.ConsensusReactor // for participating in the consensus - evidencePool *evidence.EvidencePool // tracking evidence - proxyApp proxy.AppConns // connection to the application - rpcListeners []net.Listener // rpc servers + blockStore *bc.BlockStore // store the blockchain to disk + bcReactor *bc.BlockchainReactor // for fast-syncing + mempoolReactor *mempl.MempoolReactor // for gossipping transactions + consensusState *cs.ConsensusState // latest consensus state + consensusReactor *cs.ConsensusReactor // for participating in the consensus + evidencePool *evidence.EvidencePool // tracking evidence + proxyApp proxy.AppConns // connection to the application + rpcListeners []net.Listener // rpc servers txIndexer txindex.TxIndexer indexerService *txindex.IndexerService } @@ -159,7 +159,7 @@ func NewNode(config *cfg.Config, // and sync tendermint and the app by performing a handshake // and replaying any necessary blocks consensusLogger := logger.With("module", "consensus") - handshaker := consensus.NewHandshaker(stateDB, state, blockStore) + handshaker := cs.NewHandshaker(stateDB, state, blockStore) handshaker.SetLogger(consensusLogger) proxyApp := proxy.NewAppConns(clientCreator, handshaker) proxyApp.SetLogger(logger.With("module", "proxy")) @@ -220,13 +220,13 @@ func NewNode(config *cfg.Config, bcReactor.SetLogger(logger.With("module", "blockchain")) // Make ConsensusReactor - consensusState := consensus.NewConsensusState(config.Consensus, state.Copy(), + consensusState := cs.NewConsensusState(config.Consensus, state.Copy(), blockExec, blockStore, mempool, evidencePool) consensusState.SetLogger(consensusLogger) if privValidator != nil { consensusState.SetPrivValidator(privValidator) } - consensusReactor := consensus.NewConsensusReactor(consensusState, fastSync) + consensusReactor := cs.NewConsensusReactor(consensusState, fastSync) consensusReactor.SetLogger(consensusLogger) p2pLogger := logger.With("module", "p2p") @@ -503,12 +503,12 @@ func (n *Node) BlockStore() *bc.BlockStore { } // ConsensusState returns the Node's ConsensusState. -func (n *Node) ConsensusState() *consensus.ConsensusState { +func (n *Node) ConsensusState() *cs.ConsensusState { return n.consensusState } // ConsensusReactor returns the Node's ConsensusReactor. -func (n *Node) ConsensusReactor() *consensus.ConsensusReactor { +func (n *Node) ConsensusReactor() *cs.ConsensusReactor { return n.consensusReactor } @@ -552,16 +552,26 @@ func (n *Node) makeNodeInfo(pubKey crypto.PubKey) p2p.NodeInfo { PubKey: pubKey, Network: n.genesisDoc.ChainID, Version: version.Version, + Channels: []byte{ + bc.BlockchainChannel, + cs.StateChannel, cs.DataChannel, cs.VoteChannel, cs.VoteSetBitsChannel, + mempl.MempoolChannel, + evidence.EvidenceChannel, + }, Moniker: n.config.Moniker, Other: []string{ cmn.Fmt("wire_version=%v", wire.Version), cmn.Fmt("p2p_version=%v", p2p.Version), - cmn.Fmt("consensus_version=%v", consensus.Version), + cmn.Fmt("consensus_version=%v", cs.Version), cmn.Fmt("rpc_version=%v/%v", rpc.Version, rpccore.Version), cmn.Fmt("tx_index=%v", txIndexerStatus), }, } + if n.config.P2P.PexReactor { + nodeInfo.Channels = append(nodeInfo.Channels, pex.PexChannel) + } + rpcListenAddr := n.config.RPC.ListenAddress nodeInfo.Other = append(nodeInfo.Other, cmn.Fmt("rpc_addr=%v", rpcListenAddr)) diff --git a/p2p/node_info.go b/p2p/node_info.go index 72873add8..205c63ac8 100644 --- a/p2p/node_info.go +++ b/p2p/node_info.go @@ -7,7 +7,10 @@ import ( crypto "github.com/tendermint/go-crypto" ) -const maxNodeInfoSize = 10240 // 10Kb +const ( + maxNodeInfoSize = 10240 // 10Kb + maxNumChannels = 16 // plenty of room for upgrades, for now +) func MaxNodeInfoSize() int { return maxNodeInfoSize @@ -21,8 +24,9 @@ type NodeInfo struct { ListenAddr string `json:"listen_addr"` // accepting incoming // Check compatibility - Network string `json:"network"` // network/chain ID - Version string `json:"version"` // major.minor.revision + Network string `json:"network"` // network/chain ID + Version string `json:"version"` // major.minor.revision + Channels []byte `json:"channels"` // channels this node knows about // Sanitize Moniker string `json:"moniker"` // arbitrary moniker @@ -30,18 +34,33 @@ type NodeInfo struct { } // Validate checks the self-reported NodeInfo is safe. -// It returns an error if the info.PubKey doesn't match the given pubKey. +// It returns an error if the info.PubKey doesn't match the given pubKey, +// or if there are too many Channels or any duplicate Channels. // TODO: constraints for Moniker/Other? Or is that for the UI ? func (info NodeInfo) Validate(pubKey crypto.PubKey) error { if !info.PubKey.Equals(pubKey) { return fmt.Errorf("info.PubKey (%v) doesn't match peer.PubKey (%v)", info.PubKey, pubKey) } + + if len(info.Channels) > maxNumChannels { + return fmt.Errorf("info.Channels is too long (%v). Max is %v", len(info.Channels), maxNumChannels) + } + + channels := make(map[byte]struct{}) + for _, ch := range info.Channels { + _, ok := channels[ch] + if ok { + return fmt.Errorf("info.Channels contains duplicate channel id %v", ch) + } + channels[ch] = struct{}{} + } return nil } // CompatibleWith checks if two NodeInfo are compatible with eachother. -// CONTRACT: two nodes are compatible if the major/minor versions match and network match. +// CONTRACT: two nodes are compatible if the major/minor versions match and network match +// and they have at least one channel in common. func (info NodeInfo) CompatibleWith(other NodeInfo) error { iMajor, iMinor, _, iErr := splitVersion(info.Version) oMajor, oMinor, _, oErr := splitVersion(other.Version) @@ -71,6 +90,24 @@ func (info NodeInfo) CompatibleWith(other NodeInfo) error { return fmt.Errorf("Peer is on a different network. Got %v, expected %v", other.Network, info.Network) } + // if we have no channels, we're just testing + if len(info.Channels) == 0 { + return nil + } + + // for each of our channels, check if they have it + found := false + for _, ch1 := range info.Channels { + for _, ch2 := range other.Channels { + if ch1 == ch2 { + found = true + break // only need one + } + } + } + if !found { + return fmt.Errorf("Peer has no common channels. Our channels: %v ; Peer channels: %v", info.Channels, other.Channels) + } return nil } diff --git a/p2p/peer.go b/p2p/peer.go index e427b0d95..1fa937afc 100644 --- a/p2p/peer.go +++ b/p2p/peer.go @@ -48,7 +48,8 @@ type peer struct { persistent bool config *PeerConfig - nodeInfo NodeInfo + nodeInfo NodeInfo // peer's node info + channels []byte // channels the peer knows about Data *cmn.CMap // User data. } @@ -204,6 +205,8 @@ func (p *peer) Send(chID byte, msg interface{}) bool { // see Switch#Broadcast, where we fetch the list of peers and loop over // them - while we're looping, one peer may be removed and stopped. return false + } else if !p.hasChannel(chID) { + return false } return p.mconn.Send(chID, msg) } @@ -213,6 +216,8 @@ func (p *peer) Send(chID byte, msg interface{}) bool { func (p *peer) TrySend(chID byte, msg interface{}) bool { if !p.IsRunning() { return false + } else if !p.hasChannel(chID) { + return false } return p.mconn.TrySend(chID, msg) } @@ -227,6 +232,17 @@ func (p *peer) Set(key string, data interface{}) { p.Data.Set(key, data) } +// hasChannel returns true if the peer reported +// knowing about the given chID. +func (p *peer) hasChannel(chID byte) bool { + for _, ch := range p.channels { + if ch == chID { + return true + } + } + return false +} + //--------------------------------------------------- // methods used by the Switch @@ -269,10 +285,17 @@ func (p *peer) HandshakeTimeout(ourNodeInfo NodeInfo, timeout time.Duration) err return errors.Wrap(err, "Error removing deadline") } - p.nodeInfo = peerNodeInfo + p.setNodeInfo(peerNodeInfo) return nil } +func (p *peer) setNodeInfo(nodeInfo NodeInfo) { + p.nodeInfo = nodeInfo + // cache the channels so we dont copy nodeInfo + // every time we check hasChannel + p.channels = nodeInfo.Channels +} + // Addr returns peer's remote network address. func (p *peer) Addr() net.Addr { return p.conn.RemoteAddr()