|
@ -12,6 +12,7 @@ import ( |
|
|
crypto "github.com/tendermint/go-crypto" |
|
|
crypto "github.com/tendermint/go-crypto" |
|
|
cfg "github.com/tendermint/tendermint/config" |
|
|
cfg "github.com/tendermint/tendermint/config" |
|
|
cmn "github.com/tendermint/tmlibs/common" |
|
|
cmn "github.com/tendermint/tmlibs/common" |
|
|
|
|
|
"github.com/tendermint/tmlibs/log" |
|
|
) |
|
|
) |
|
|
|
|
|
|
|
|
const ( |
|
|
const ( |
|
@ -81,17 +82,18 @@ type Switch struct { |
|
|
reactorsByCh map[byte]Reactor |
|
|
reactorsByCh map[byte]Reactor |
|
|
peers *PeerSet |
|
|
peers *PeerSet |
|
|
dialing *cmn.CMap |
|
|
dialing *cmn.CMap |
|
|
nodeInfo *NodeInfo // our node info
|
|
|
|
|
|
nodePrivKey crypto.PrivKeyEd25519 // our node privkey
|
|
|
|
|
|
|
|
|
nodeInfo *NodeInfo // our node info
|
|
|
|
|
|
nodeKey *NodeKey // our node privkey
|
|
|
|
|
|
|
|
|
filterConnByAddr func(net.Addr) error |
|
|
filterConnByAddr func(net.Addr) error |
|
|
filterConnByPubKey func(crypto.PubKeyEd25519) error |
|
|
|
|
|
|
|
|
filterConnByPubKey func(crypto.PubKey) error |
|
|
|
|
|
|
|
|
rng *rand.Rand // seed for randomizing dial times and orders
|
|
|
rng *rand.Rand // seed for randomizing dial times and orders
|
|
|
} |
|
|
} |
|
|
|
|
|
|
|
|
var ( |
|
|
var ( |
|
|
ErrSwitchDuplicatePeer = errors.New("Duplicate peer") |
|
|
ErrSwitchDuplicatePeer = errors.New("Duplicate peer") |
|
|
|
|
|
ErrSwitchConnectToSelf = errors.New("Connect to self") |
|
|
) |
|
|
) |
|
|
|
|
|
|
|
|
func NewSwitch(config *cfg.P2PConfig) *Switch { |
|
|
func NewSwitch(config *cfg.P2PConfig) *Switch { |
|
@ -181,13 +183,13 @@ func (sw *Switch) NodeInfo() *NodeInfo { |
|
|
return sw.nodeInfo |
|
|
return sw.nodeInfo |
|
|
} |
|
|
} |
|
|
|
|
|
|
|
|
// SetNodePrivKey sets the switch's private key for authenticated encryption.
|
|
|
|
|
|
|
|
|
// SetNodeKey sets the switch's private key for authenticated encryption.
|
|
|
// NOTE: Overwrites sw.nodeInfo.PubKey.
|
|
|
// NOTE: Overwrites sw.nodeInfo.PubKey.
|
|
|
// NOTE: Not goroutine safe.
|
|
|
// NOTE: Not goroutine safe.
|
|
|
func (sw *Switch) SetNodePrivKey(nodePrivKey crypto.PrivKeyEd25519) { |
|
|
|
|
|
sw.nodePrivKey = nodePrivKey |
|
|
|
|
|
|
|
|
func (sw *Switch) SetNodeKey(nodeKey *NodeKey) { |
|
|
|
|
|
sw.nodeKey = nodeKey |
|
|
if sw.nodeInfo != nil { |
|
|
if sw.nodeInfo != nil { |
|
|
sw.nodeInfo.PubKey = nodePrivKey.PubKey().Unwrap().(crypto.PubKeyEd25519) |
|
|
|
|
|
|
|
|
sw.nodeInfo.PubKey = nodeKey.PubKey() |
|
|
} |
|
|
} |
|
|
} |
|
|
} |
|
|
|
|
|
|
|
@ -231,11 +233,15 @@ func (sw *Switch) OnStop() { |
|
|
// NOTE: This performs a blocking handshake before the peer is added.
|
|
|
// NOTE: This performs a blocking handshake before the peer is added.
|
|
|
// NOTE: If error is returned, caller is responsible for calling peer.CloseConn()
|
|
|
// NOTE: If error is returned, caller is responsible for calling peer.CloseConn()
|
|
|
func (sw *Switch) addPeer(peer *peer) error { |
|
|
func (sw *Switch) addPeer(peer *peer) error { |
|
|
|
|
|
// Avoid self
|
|
|
|
|
|
if sw.nodeKey.ID() == peer.ID() { |
|
|
|
|
|
return ErrSwitchConnectToSelf |
|
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
// Filter peer against white list
|
|
|
if err := sw.FilterConnByAddr(peer.Addr()); err != nil { |
|
|
if err := sw.FilterConnByAddr(peer.Addr()); err != nil { |
|
|
return err |
|
|
return err |
|
|
} |
|
|
} |
|
|
|
|
|
|
|
|
if err := sw.FilterConnByPubKey(peer.PubKey()); err != nil { |
|
|
if err := sw.FilterConnByPubKey(peer.PubKey()); err != nil { |
|
|
return err |
|
|
return err |
|
|
} |
|
|
} |
|
@ -244,9 +250,10 @@ func (sw *Switch) addPeer(peer *peer) error { |
|
|
return err |
|
|
return err |
|
|
} |
|
|
} |
|
|
|
|
|
|
|
|
// Avoid self
|
|
|
|
|
|
if sw.nodeInfo.PubKey.Equals(peer.PubKey().Wrap()) { |
|
|
|
|
|
return errors.New("Ignoring connection from self") |
|
|
|
|
|
|
|
|
// Avoid duplicate
|
|
|
|
|
|
if sw.peers.Has(peer.ID()) { |
|
|
|
|
|
return ErrSwitchDuplicatePeer |
|
|
|
|
|
|
|
|
} |
|
|
} |
|
|
|
|
|
|
|
|
// Check version, chain id
|
|
|
// Check version, chain id
|
|
@ -254,12 +261,6 @@ func (sw *Switch) addPeer(peer *peer) error { |
|
|
return err |
|
|
return err |
|
|
} |
|
|
} |
|
|
|
|
|
|
|
|
// Check for duplicate peer
|
|
|
|
|
|
if sw.peers.Has(peer.Key()) { |
|
|
|
|
|
return ErrSwitchDuplicatePeer |
|
|
|
|
|
|
|
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
// Start peer
|
|
|
// Start peer
|
|
|
if sw.IsRunning() { |
|
|
if sw.IsRunning() { |
|
|
sw.startInitPeer(peer) |
|
|
sw.startInitPeer(peer) |
|
@ -285,7 +286,7 @@ func (sw *Switch) FilterConnByAddr(addr net.Addr) error { |
|
|
} |
|
|
} |
|
|
|
|
|
|
|
|
// FilterConnByPubKey returns an error if connecting to the given public key is forbidden.
|
|
|
// FilterConnByPubKey returns an error if connecting to the given public key is forbidden.
|
|
|
func (sw *Switch) FilterConnByPubKey(pubkey crypto.PubKeyEd25519) error { |
|
|
|
|
|
|
|
|
func (sw *Switch) FilterConnByPubKey(pubkey crypto.PubKey) error { |
|
|
if sw.filterConnByPubKey != nil { |
|
|
if sw.filterConnByPubKey != nil { |
|
|
return sw.filterConnByPubKey(pubkey) |
|
|
return sw.filterConnByPubKey(pubkey) |
|
|
} |
|
|
} |
|
@ -299,7 +300,7 @@ func (sw *Switch) SetAddrFilter(f func(net.Addr) error) { |
|
|
} |
|
|
} |
|
|
|
|
|
|
|
|
// SetPubKeyFilter sets the function for filtering connections by public key.
|
|
|
// SetPubKeyFilter sets the function for filtering connections by public key.
|
|
|
func (sw *Switch) SetPubKeyFilter(f func(crypto.PubKeyEd25519) error) { |
|
|
|
|
|
|
|
|
func (sw *Switch) SetPubKeyFilter(f func(crypto.PubKey) error) { |
|
|
sw.filterConnByPubKey = f |
|
|
sw.filterConnByPubKey = f |
|
|
} |
|
|
} |
|
|
|
|
|
|
|
@ -318,6 +319,7 @@ func (sw *Switch) startInitPeer(peer *peer) { |
|
|
// DialPeersAsync dials a list of peers asynchronously in random order (optionally, making them persistent).
|
|
|
// DialPeersAsync dials a list of peers asynchronously in random order (optionally, making them persistent).
|
|
|
func (sw *Switch) DialPeersAsync(addrBook *AddrBook, peers []string, persistent bool) error { |
|
|
func (sw *Switch) DialPeersAsync(addrBook *AddrBook, peers []string, persistent bool) error { |
|
|
netAddrs, errs := NewNetAddressStrings(peers) |
|
|
netAddrs, errs := NewNetAddressStrings(peers) |
|
|
|
|
|
// TODO: IDs
|
|
|
for _, err := range errs { |
|
|
for _, err := range errs { |
|
|
sw.Logger.Error("Error in peer's address", "err", err) |
|
|
sw.Logger.Error("Error in peer's address", "err", err) |
|
|
} |
|
|
} |
|
@ -362,16 +364,24 @@ func (sw *Switch) randomSleep(interval time.Duration) { |
|
|
// DialPeerWithAddress dials the given peer and runs sw.addPeer if it connects successfully.
|
|
|
// DialPeerWithAddress dials the given peer and runs sw.addPeer if it connects successfully.
|
|
|
// If `persistent == true`, the switch will always try to reconnect to this peer if the connection ever fails.
|
|
|
// If `persistent == true`, the switch will always try to reconnect to this peer if the connection ever fails.
|
|
|
func (sw *Switch) DialPeerWithAddress(addr *NetAddress, persistent bool) (Peer, error) { |
|
|
func (sw *Switch) DialPeerWithAddress(addr *NetAddress, persistent bool) (Peer, error) { |
|
|
sw.dialing.Set(addr.IP.String(), addr) |
|
|
|
|
|
defer sw.dialing.Delete(addr.IP.String()) |
|
|
|
|
|
|
|
|
sw.dialing.Set(string(addr.ID), addr) |
|
|
|
|
|
defer sw.dialing.Delete(string(addr.ID)) |
|
|
|
|
|
|
|
|
sw.Logger.Info("Dialing peer", "address", addr) |
|
|
sw.Logger.Info("Dialing peer", "address", addr) |
|
|
peer, err := newOutboundPeer(addr, sw.reactorsByCh, sw.chDescs, sw.StopPeerForError, sw.nodePrivKey, sw.peerConfig) |
|
|
|
|
|
|
|
|
peer, err := newOutboundPeer(addr, sw.reactorsByCh, sw.chDescs, sw.StopPeerForError, sw.nodeKey.PrivKey, sw.peerConfig) |
|
|
if err != nil { |
|
|
if err != nil { |
|
|
sw.Logger.Error("Failed to dial peer", "address", addr, "err", err) |
|
|
sw.Logger.Error("Failed to dial peer", "address", addr, "err", err) |
|
|
return nil, err |
|
|
return nil, err |
|
|
} |
|
|
} |
|
|
peer.SetLogger(sw.Logger.With("peer", addr)) |
|
|
peer.SetLogger(sw.Logger.With("peer", addr)) |
|
|
|
|
|
|
|
|
|
|
|
// authenticate peer
|
|
|
|
|
|
if addr.ID == "" { |
|
|
|
|
|
peer.Logger.Info("Dialed peer with unknown ID - unable to authenticate", "addr", addr) |
|
|
|
|
|
} else if addr.ID != peer.ID() { |
|
|
|
|
|
return nil, fmt.Errorf("Failed to authenticate peer %v. Connected to peer with ID %s", addr, peer.ID()) |
|
|
|
|
|
} |
|
|
|
|
|
|
|
|
if persistent { |
|
|
if persistent { |
|
|
peer.makePersistent() |
|
|
peer.makePersistent() |
|
|
} |
|
|
} |
|
@ -385,9 +395,9 @@ func (sw *Switch) DialPeerWithAddress(addr *NetAddress, persistent bool) (Peer, |
|
|
return peer, nil |
|
|
return peer, nil |
|
|
} |
|
|
} |
|
|
|
|
|
|
|
|
// IsDialing returns true if the switch is currently dialing the given address.
|
|
|
|
|
|
func (sw *Switch) IsDialing(addr *NetAddress) bool { |
|
|
|
|
|
return sw.dialing.Has(addr.IP.String()) |
|
|
|
|
|
|
|
|
// IsDialing returns true if the switch is currently dialing the given ID.
|
|
|
|
|
|
func (sw *Switch) IsDialing(id ID) bool { |
|
|
|
|
|
return sw.dialing.Has(string(id)) |
|
|
} |
|
|
} |
|
|
|
|
|
|
|
|
// Broadcast runs a go routine for each attempted send, which will block
|
|
|
// Broadcast runs a go routine for each attempted send, which will block
|
|
@ -443,7 +453,7 @@ func (sw *Switch) StopPeerForError(peer Peer, reason interface{}) { |
|
|
// If no success after all that, it stops trying, and leaves it
|
|
|
// If no success after all that, it stops trying, and leaves it
|
|
|
// to the PEX/Addrbook to find the peer again
|
|
|
// to the PEX/Addrbook to find the peer again
|
|
|
func (sw *Switch) reconnectToPeer(peer Peer) { |
|
|
func (sw *Switch) reconnectToPeer(peer Peer) { |
|
|
addr, _ := NewNetAddressString(peer.NodeInfo().RemoteAddr) |
|
|
|
|
|
|
|
|
netAddr := peer.NodeInfo().NetAddress() |
|
|
start := time.Now() |
|
|
start := time.Now() |
|
|
sw.Logger.Info("Reconnecting to peer", "peer", peer) |
|
|
sw.Logger.Info("Reconnecting to peer", "peer", peer) |
|
|
for i := 0; i < reconnectAttempts; i++ { |
|
|
for i := 0; i < reconnectAttempts; i++ { |
|
@ -451,7 +461,7 @@ func (sw *Switch) reconnectToPeer(peer Peer) { |
|
|
return |
|
|
return |
|
|
} |
|
|
} |
|
|
|
|
|
|
|
|
peer, err := sw.DialPeerWithAddress(addr, true) |
|
|
|
|
|
|
|
|
peer, err := sw.DialPeerWithAddress(netAddr, true) |
|
|
if err != nil { |
|
|
if err != nil { |
|
|
sw.Logger.Info("Error reconnecting to peer. Trying again", "tries", i, "err", err, "peer", peer) |
|
|
sw.Logger.Info("Error reconnecting to peer. Trying again", "tries", i, "err", err, "peer", peer) |
|
|
// sleep a set amount
|
|
|
// sleep a set amount
|
|
@ -473,7 +483,7 @@ func (sw *Switch) reconnectToPeer(peer Peer) { |
|
|
// sleep an exponentially increasing amount
|
|
|
// sleep an exponentially increasing amount
|
|
|
sleepIntervalSeconds := math.Pow(reconnectBackOffBaseSeconds, float64(i)) |
|
|
sleepIntervalSeconds := math.Pow(reconnectBackOffBaseSeconds, float64(i)) |
|
|
sw.randomSleep(time.Duration(sleepIntervalSeconds) * time.Second) |
|
|
sw.randomSleep(time.Duration(sleepIntervalSeconds) * time.Second) |
|
|
peer, err := sw.DialPeerWithAddress(addr, true) |
|
|
|
|
|
|
|
|
peer, err := sw.DialPeerWithAddress(netAddr, true) |
|
|
if err != nil { |
|
|
if err != nil { |
|
|
sw.Logger.Info("Error reconnecting to peer. Trying again", "tries", i, "err", err, "peer", peer) |
|
|
sw.Logger.Info("Error reconnecting to peer. Trying again", "tries", i, "err", err, "peer", peer) |
|
|
continue |
|
|
continue |
|
@ -594,24 +604,26 @@ func StartSwitches(switches []*Switch) error { |
|
|
} |
|
|
} |
|
|
|
|
|
|
|
|
func makeSwitch(cfg *cfg.P2PConfig, i int, network, version string, initSwitch func(int, *Switch) *Switch) *Switch { |
|
|
func makeSwitch(cfg *cfg.P2PConfig, i int, network, version string, initSwitch func(int, *Switch) *Switch) *Switch { |
|
|
privKey := crypto.GenPrivKeyEd25519() |
|
|
|
|
|
// new switch, add reactors
|
|
|
// new switch, add reactors
|
|
|
// TODO: let the config be passed in?
|
|
|
// TODO: let the config be passed in?
|
|
|
|
|
|
nodeKey := &NodeKey{ |
|
|
|
|
|
PrivKey: crypto.GenPrivKeyEd25519().Wrap(), |
|
|
|
|
|
} |
|
|
s := initSwitch(i, NewSwitch(cfg)) |
|
|
s := initSwitch(i, NewSwitch(cfg)) |
|
|
s.SetNodeInfo(&NodeInfo{ |
|
|
s.SetNodeInfo(&NodeInfo{ |
|
|
PubKey: privKey.PubKey().Unwrap().(crypto.PubKeyEd25519), |
|
|
|
|
|
|
|
|
PubKey: nodeKey.PubKey(), |
|
|
Moniker: cmn.Fmt("switch%d", i), |
|
|
Moniker: cmn.Fmt("switch%d", i), |
|
|
Network: network, |
|
|
Network: network, |
|
|
Version: version, |
|
|
Version: version, |
|
|
RemoteAddr: cmn.Fmt("%v:%v", network, rand.Intn(64512)+1023), |
|
|
|
|
|
ListenAddr: cmn.Fmt("%v:%v", network, rand.Intn(64512)+1023), |
|
|
ListenAddr: cmn.Fmt("%v:%v", network, rand.Intn(64512)+1023), |
|
|
}) |
|
|
}) |
|
|
s.SetNodePrivKey(privKey) |
|
|
|
|
|
|
|
|
s.SetNodeKey(nodeKey) |
|
|
|
|
|
s.SetLogger(log.TestingLogger()) |
|
|
return s |
|
|
return s |
|
|
} |
|
|
} |
|
|
|
|
|
|
|
|
func (sw *Switch) addPeerWithConnection(conn net.Conn) error { |
|
|
func (sw *Switch) addPeerWithConnection(conn net.Conn) error { |
|
|
peer, err := newInboundPeer(conn, sw.reactorsByCh, sw.chDescs, sw.StopPeerForError, sw.nodePrivKey, sw.peerConfig) |
|
|
|
|
|
|
|
|
peer, err := newInboundPeer(conn, sw.reactorsByCh, sw.chDescs, sw.StopPeerForError, sw.nodeKey.PrivKey, sw.peerConfig) |
|
|
if err != nil { |
|
|
if err != nil { |
|
|
if err := conn.Close(); err != nil { |
|
|
if err := conn.Close(); err != nil { |
|
|
sw.Logger.Error("Error closing connection", "err", err) |
|
|
sw.Logger.Error("Error closing connection", "err", err) |
|
@ -628,7 +640,7 @@ func (sw *Switch) addPeerWithConnection(conn net.Conn) error { |
|
|
} |
|
|
} |
|
|
|
|
|
|
|
|
func (sw *Switch) addPeerWithConnectionAndConfig(conn net.Conn, config *PeerConfig) error { |
|
|
func (sw *Switch) addPeerWithConnectionAndConfig(conn net.Conn, config *PeerConfig) error { |
|
|
peer, err := newInboundPeer(conn, sw.reactorsByCh, sw.chDescs, sw.StopPeerForError, sw.nodePrivKey, config) |
|
|
|
|
|
|
|
|
peer, err := newInboundPeer(conn, sw.reactorsByCh, sw.chDescs, sw.StopPeerForError, sw.nodeKey.PrivKey, config) |
|
|
if err != nil { |
|
|
if err != nil { |
|
|
if err := conn.Close(); err != nil { |
|
|
if err := conn.Close(); err != nil { |
|
|
sw.Logger.Error("Error closing connection", "err", err) |
|
|
sw.Logger.Error("Error closing connection", "err", err) |
|
|