|
|
@ -95,7 +95,8 @@ func NewSwitch(config *cfg.P2PConfig) *Switch { |
|
|
|
return sw |
|
|
|
} |
|
|
|
|
|
|
|
// AddReactor is not goroutine safe.
|
|
|
|
// AddReactor adds the given reactor to the switch.
|
|
|
|
// NOTE: Not goroutine safe.
|
|
|
|
func (sw *Switch) AddReactor(name string, reactor Reactor) Reactor { |
|
|
|
// Validate the reactor.
|
|
|
|
// No two reactors can share the same channel.
|
|
|
@ -113,43 +114,51 @@ func (sw *Switch) AddReactor(name string, reactor Reactor) Reactor { |
|
|
|
return reactor |
|
|
|
} |
|
|
|
|
|
|
|
// Reactors is not goroutine safe.
|
|
|
|
// Reactors returns a map of reactors registered on the switch.
|
|
|
|
// NOTE: Not goroutine safe.
|
|
|
|
func (sw *Switch) Reactors() map[string]Reactor { |
|
|
|
return sw.reactors |
|
|
|
} |
|
|
|
|
|
|
|
// Reactor is not goroutine safe.
|
|
|
|
// Reactor returns the reactor with the given name.
|
|
|
|
// NOTE: Not goroutine safe.
|
|
|
|
func (sw *Switch) Reactor(name string) Reactor { |
|
|
|
return sw.reactors[name] |
|
|
|
} |
|
|
|
|
|
|
|
// AddListener is not goroutine safe.
|
|
|
|
// AddListener adds the given listener to the switch for listening to incoming peer connections.
|
|
|
|
// NOTE: Not goroutine safe.
|
|
|
|
func (sw *Switch) AddListener(l Listener) { |
|
|
|
sw.listeners = append(sw.listeners, l) |
|
|
|
} |
|
|
|
|
|
|
|
// Listeners is not goroutine safe.
|
|
|
|
// Listeners returns the list of listeners the switch listens on.
|
|
|
|
// NOTE: Not goroutine safe.
|
|
|
|
func (sw *Switch) Listeners() []Listener { |
|
|
|
return sw.listeners |
|
|
|
} |
|
|
|
|
|
|
|
// IsListening is not goroutine safe.
|
|
|
|
// IsListening returns true if the switch has at least one listener.
|
|
|
|
// NOTE: Not goroutine safe.
|
|
|
|
func (sw *Switch) IsListening() bool { |
|
|
|
return len(sw.listeners) > 0 |
|
|
|
} |
|
|
|
|
|
|
|
// SetNodeInfo is not goroutine safe.
|
|
|
|
// SetNodeInfo sets the switch's NodeInfo for checking compatibility and handshaking with other nodes.
|
|
|
|
// NOTE: Not goroutine safe.
|
|
|
|
func (sw *Switch) SetNodeInfo(nodeInfo *NodeInfo) { |
|
|
|
sw.nodeInfo = nodeInfo |
|
|
|
} |
|
|
|
|
|
|
|
// NodeInfo is not goroutine safe.
|
|
|
|
// NodeInfo returns the switch's NodeInfo.
|
|
|
|
// NOTE: Not goroutine safe.
|
|
|
|
func (sw *Switch) NodeInfo() *NodeInfo { |
|
|
|
return sw.nodeInfo |
|
|
|
} |
|
|
|
|
|
|
|
// SetNodePrivKey is not goroutine safe.
|
|
|
|
// NOTE: Overwrites sw.nodeInfo.PubKey
|
|
|
|
// SetNodePrivKey sets the switche's private key for authenticated encryption.
|
|
|
|
// NOTE: Overwrites sw.nodeInfo.PubKey.
|
|
|
|
// NOTE: Not goroutine safe.
|
|
|
|
func (sw *Switch) SetNodePrivKey(nodePrivKey crypto.PrivKeyEd25519) { |
|
|
|
sw.nodePrivKey = nodePrivKey |
|
|
|
if sw.nodeInfo != nil { |
|
|
@ -157,7 +166,7 @@ func (sw *Switch) SetNodePrivKey(nodePrivKey crypto.PrivKeyEd25519) { |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
// Switch.Start() starts all the reactors, peers, and listeners.
|
|
|
|
// OnStart implements BaseService. It starts all the reactors, peers, and listeners.
|
|
|
|
func (sw *Switch) OnStart() error { |
|
|
|
sw.BaseService.OnStart() |
|
|
|
// Start reactors
|
|
|
@ -178,6 +187,7 @@ func (sw *Switch) OnStart() error { |
|
|
|
return nil |
|
|
|
} |
|
|
|
|
|
|
|
// OnStop implements BaseService. It stops all listeners, peers, and reactors.
|
|
|
|
func (sw *Switch) OnStop() { |
|
|
|
sw.BaseService.OnStop() |
|
|
|
// Stop listeners
|
|
|
@ -196,6 +206,8 @@ func (sw *Switch) OnStop() { |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
// AddPeer checks the given peer's validity, performs a handshake, and adds the peer to the switch
|
|
|
|
// and to all registered reactors.
|
|
|
|
// NOTE: This performs a blocking handshake before the peer is added.
|
|
|
|
// CONTRACT: If error is returned, peer is nil, and conn is immediately closed.
|
|
|
|
func (sw *Switch) AddPeer(peer *Peer) error { |
|
|
@ -243,6 +255,7 @@ func (sw *Switch) AddPeer(peer *Peer) error { |
|
|
|
return nil |
|
|
|
} |
|
|
|
|
|
|
|
// FilterConnByAddr returns an error if connecting to the given address is forbidden.
|
|
|
|
func (sw *Switch) FilterConnByAddr(addr net.Addr) error { |
|
|
|
if sw.filterConnByAddr != nil { |
|
|
|
return sw.filterConnByAddr(addr) |
|
|
@ -250,6 +263,7 @@ func (sw *Switch) FilterConnByAddr(addr net.Addr) error { |
|
|
|
return nil |
|
|
|
} |
|
|
|
|
|
|
|
// FilterConnByPubKey returns an error if connecting to the given public key is forbidden.
|
|
|
|
func (sw *Switch) FilterConnByPubKey(pubkey crypto.PubKeyEd25519) error { |
|
|
|
if sw.filterConnByPubKey != nil { |
|
|
|
return sw.filterConnByPubKey(pubkey) |
|
|
@ -258,10 +272,12 @@ func (sw *Switch) FilterConnByPubKey(pubkey crypto.PubKeyEd25519) error { |
|
|
|
|
|
|
|
} |
|
|
|
|
|
|
|
// SetAddrFilter sets the function for filtering connections by address.
|
|
|
|
func (sw *Switch) SetAddrFilter(f func(net.Addr) error) { |
|
|
|
sw.filterConnByAddr = f |
|
|
|
} |
|
|
|
|
|
|
|
// SetPubKeyFilter sets the function for filtering connections by public key.
|
|
|
|
func (sw *Switch) SetPubKeyFilter(f func(crypto.PubKeyEd25519) error) { |
|
|
|
sw.filterConnByPubKey = f |
|
|
|
} |
|
|
@ -316,6 +332,8 @@ func (sw *Switch) dialSeed(addr *NetAddress) { |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
// DialPeerWithAddress dials the given peer and runs sw.AddPeer if it connects successfully.
|
|
|
|
// If `persistent == true`, the switch will always try to reconnect to this peer if the connection ever fails.
|
|
|
|
func (sw *Switch) DialPeerWithAddress(addr *NetAddress, persistent bool) (*Peer, error) { |
|
|
|
sw.dialing.Set(addr.IP.String(), addr) |
|
|
|
defer sw.dialing.Delete(addr.IP.String()) |
|
|
@ -340,6 +358,7 @@ func (sw *Switch) DialPeerWithAddress(addr *NetAddress, persistent bool) (*Peer, |
|
|
|
return peer, nil |
|
|
|
} |
|
|
|
|
|
|
|
// IsDialing returns true if the switch is currently dialing the given address.
|
|
|
|
func (sw *Switch) IsDialing(addr *NetAddress) bool { |
|
|
|
return sw.dialing.Has(addr.IP.String()) |
|
|
|
} |
|
|
@ -348,6 +367,7 @@ func (sw *Switch) IsDialing(addr *NetAddress) bool { |
|
|
|
// trying to send for defaultSendTimeoutSeconds. Returns a channel
|
|
|
|
// which receives success values for each attempted send (false if times out)
|
|
|
|
// NOTE: Broadcast uses goroutines, so order of broadcast may not be preserved.
|
|
|
|
// TODO: Something more intelligent.
|
|
|
|
func (sw *Switch) Broadcast(chID byte, msg interface{}) chan bool { |
|
|
|
successChan := make(chan bool, len(sw.peers.List())) |
|
|
|
sw.Logger.Debug("Broadcast", "channel", chID, "msg", msg) |
|
|
@ -374,11 +394,13 @@ func (sw *Switch) NumPeers() (outbound, inbound, dialing int) { |
|
|
|
return |
|
|
|
} |
|
|
|
|
|
|
|
// Peers returns the set of peers the switch is connected to.
|
|
|
|
func (sw *Switch) Peers() IPeerSet { |
|
|
|
return sw.peers |
|
|
|
} |
|
|
|
|
|
|
|
// StopPeerForError disconnect from a peer due to external error, retry if it is a persistent peer.
|
|
|
|
// StopPeerForError disconnects from a peer due to external error.
|
|
|
|
// If the peer is persistent, it will attempt to reconnect.
|
|
|
|
// TODO: make record depending on reason.
|
|
|
|
func (sw *Switch) StopPeerForError(peer *Peer, reason interface{}) { |
|
|
|
addr := NewNetAddress(peer.Addr()) |
|
|
@ -411,7 +433,7 @@ func (sw *Switch) StopPeerForError(peer *Peer, reason interface{}) { |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
// StopPeerGracefully disconnect from a peer gracefully.
|
|
|
|
// StopPeerGracefully disconnects from a peer gracefully.
|
|
|
|
// TODO: handle graceful disconnects.
|
|
|
|
func (sw *Switch) StopPeerGracefully(peer *Peer) { |
|
|
|
sw.Logger.Info("Stopping peer gracefully") |
|
|
@ -520,6 +542,8 @@ func Connect2Switches(switches []*Switch, i, j int) { |
|
|
|
<-doneCh |
|
|
|
} |
|
|
|
|
|
|
|
// StartSwitches calls sw.Start() for each given switch.
|
|
|
|
// It returns the first encountered error.
|
|
|
|
func StartSwitches(switches []*Switch) error { |
|
|
|
for _, s := range switches { |
|
|
|
_, err := s.Start() // start switch and reactors
|
|
|
|