|
@ -17,7 +17,7 @@ import ( |
|
|
) |
|
|
) |
|
|
|
|
|
|
|
|
var ( |
|
|
var ( |
|
|
_ service.Service = (*ReactorV2)(nil) |
|
|
|
|
|
|
|
|
_ service.Service = (*Reactor)(nil) |
|
|
_ p2p.Wrapper = (*protop2p.PexMessage)(nil) |
|
|
_ p2p.Wrapper = (*protop2p.PexMessage)(nil) |
|
|
) |
|
|
) |
|
|
|
|
|
|
|
@ -73,11 +73,6 @@ func ChannelDescriptor() conn.ChannelDescriptor { |
|
|
} |
|
|
} |
|
|
} |
|
|
} |
|
|
|
|
|
|
|
|
// ReactorV2 is a PEX reactor for the new P2P stack. The legacy reactor
|
|
|
|
|
|
// is Reactor.
|
|
|
|
|
|
//
|
|
|
|
|
|
// FIXME: Rename this when Reactor is removed, and consider moving to p2p/.
|
|
|
|
|
|
//
|
|
|
|
|
|
// The peer exchange or PEX reactor supports the peer manager by sending
|
|
|
// The peer exchange or PEX reactor supports the peer manager by sending
|
|
|
// requests to other peers for addresses that can be given to the peer manager
|
|
|
// requests to other peers for addresses that can be given to the peer manager
|
|
|
// and at the same time advertises addresses to peers that need more.
|
|
|
// and at the same time advertises addresses to peers that need more.
|
|
@ -86,7 +81,7 @@ func ChannelDescriptor() conn.ChannelDescriptor { |
|
|
// increasing the interval between each request. It tracks connected peers via
|
|
|
// increasing the interval between each request. It tracks connected peers via
|
|
|
// a linked list, sending a request to the node at the front of the list and
|
|
|
// a linked list, sending a request to the node at the front of the list and
|
|
|
// adding it to the back of the list once a response is received.
|
|
|
// adding it to the back of the list once a response is received.
|
|
|
type ReactorV2 struct { |
|
|
|
|
|
|
|
|
type Reactor struct { |
|
|
service.BaseService |
|
|
service.BaseService |
|
|
|
|
|
|
|
|
peerManager *p2p.PeerManager |
|
|
peerManager *p2p.PeerManager |
|
@ -125,14 +120,14 @@ type ReactorV2 struct { |
|
|
} |
|
|
} |
|
|
|
|
|
|
|
|
// NewReactor returns a reference to a new reactor.
|
|
|
// NewReactor returns a reference to a new reactor.
|
|
|
func NewReactorV2( |
|
|
|
|
|
|
|
|
func NewReactor( |
|
|
logger log.Logger, |
|
|
logger log.Logger, |
|
|
peerManager *p2p.PeerManager, |
|
|
peerManager *p2p.PeerManager, |
|
|
pexCh *p2p.Channel, |
|
|
pexCh *p2p.Channel, |
|
|
peerUpdates *p2p.PeerUpdates, |
|
|
peerUpdates *p2p.PeerUpdates, |
|
|
) *ReactorV2 { |
|
|
|
|
|
|
|
|
) *Reactor { |
|
|
|
|
|
|
|
|
r := &ReactorV2{ |
|
|
|
|
|
|
|
|
r := &Reactor{ |
|
|
peerManager: peerManager, |
|
|
peerManager: peerManager, |
|
|
pexCh: pexCh, |
|
|
pexCh: pexCh, |
|
|
peerUpdates: peerUpdates, |
|
|
peerUpdates: peerUpdates, |
|
@ -150,7 +145,7 @@ func NewReactorV2( |
|
|
// envelopes on each. In addition, it also listens for peer updates and handles
|
|
|
// envelopes on each. In addition, it also listens for peer updates and handles
|
|
|
// messages on that p2p channel accordingly. The caller must be sure to execute
|
|
|
// messages on that p2p channel accordingly. The caller must be sure to execute
|
|
|
// OnStop to ensure the outbound p2p Channels are closed.
|
|
|
// OnStop to ensure the outbound p2p Channels are closed.
|
|
|
func (r *ReactorV2) OnStart() error { |
|
|
|
|
|
|
|
|
func (r *Reactor) OnStart() error { |
|
|
go r.processPexCh() |
|
|
go r.processPexCh() |
|
|
go r.processPeerUpdates() |
|
|
go r.processPeerUpdates() |
|
|
return nil |
|
|
return nil |
|
@ -158,7 +153,7 @@ func (r *ReactorV2) OnStart() error { |
|
|
|
|
|
|
|
|
// OnStop stops the reactor by signaling to all spawned goroutines to exit and
|
|
|
// OnStop stops the reactor by signaling to all spawned goroutines to exit and
|
|
|
// blocking until they all exit.
|
|
|
// blocking until they all exit.
|
|
|
func (r *ReactorV2) OnStop() { |
|
|
|
|
|
|
|
|
func (r *Reactor) OnStop() { |
|
|
// Close closeCh to signal to all spawned goroutines to gracefully exit. All
|
|
|
// Close closeCh to signal to all spawned goroutines to gracefully exit. All
|
|
|
// p2p Channels should execute Close().
|
|
|
// p2p Channels should execute Close().
|
|
|
close(r.closeCh) |
|
|
close(r.closeCh) |
|
@ -172,7 +167,7 @@ func (r *ReactorV2) OnStop() { |
|
|
|
|
|
|
|
|
// processPexCh implements a blocking event loop where we listen for p2p
|
|
|
// processPexCh implements a blocking event loop where we listen for p2p
|
|
|
// Envelope messages from the pexCh.
|
|
|
// Envelope messages from the pexCh.
|
|
|
func (r *ReactorV2) processPexCh() { |
|
|
|
|
|
|
|
|
func (r *Reactor) processPexCh() { |
|
|
defer r.pexCh.Close() |
|
|
defer r.pexCh.Close() |
|
|
|
|
|
|
|
|
for { |
|
|
for { |
|
@ -202,7 +197,7 @@ func (r *ReactorV2) processPexCh() { |
|
|
// processPeerUpdates initiates a blocking process where we listen for and handle
|
|
|
// processPeerUpdates initiates a blocking process where we listen for and handle
|
|
|
// PeerUpdate messages. When the reactor is stopped, we will catch the signal and
|
|
|
// PeerUpdate messages. When the reactor is stopped, we will catch the signal and
|
|
|
// close the p2p PeerUpdatesCh gracefully.
|
|
|
// close the p2p PeerUpdatesCh gracefully.
|
|
|
func (r *ReactorV2) processPeerUpdates() { |
|
|
|
|
|
|
|
|
func (r *Reactor) processPeerUpdates() { |
|
|
defer r.peerUpdates.Close() |
|
|
defer r.peerUpdates.Close() |
|
|
|
|
|
|
|
|
for { |
|
|
for { |
|
@ -218,7 +213,7 @@ func (r *ReactorV2) processPeerUpdates() { |
|
|
} |
|
|
} |
|
|
|
|
|
|
|
|
// handlePexMessage handles envelopes sent from peers on the PexChannel.
|
|
|
// handlePexMessage handles envelopes sent from peers on the PexChannel.
|
|
|
func (r *ReactorV2) handlePexMessage(envelope p2p.Envelope) error { |
|
|
|
|
|
|
|
|
func (r *Reactor) handlePexMessage(envelope p2p.Envelope) error { |
|
|
logger := r.Logger.With("peer", envelope.From) |
|
|
logger := r.Logger.With("peer", envelope.From) |
|
|
|
|
|
|
|
|
switch msg := envelope.Message.(type) { |
|
|
switch msg := envelope.Message.(type) { |
|
@ -337,7 +332,7 @@ func (r *ReactorV2) handlePexMessage(envelope p2p.Envelope) error { |
|
|
//
|
|
|
//
|
|
|
// FIXME: We may want to cache and parallelize this, but for now we'll just rely
|
|
|
// FIXME: We may want to cache and parallelize this, but for now we'll just rely
|
|
|
// on the operating system to cache it for us.
|
|
|
// on the operating system to cache it for us.
|
|
|
func (r *ReactorV2) resolve(addresses []p2p.NodeAddress) []protop2p.PexAddress { |
|
|
|
|
|
|
|
|
func (r *Reactor) resolve(addresses []p2p.NodeAddress) []protop2p.PexAddress { |
|
|
limit := len(addresses) |
|
|
limit := len(addresses) |
|
|
pexAddresses := make([]protop2p.PexAddress, 0, limit) |
|
|
pexAddresses := make([]protop2p.PexAddress, 0, limit) |
|
|
|
|
|
|
|
@ -380,7 +375,7 @@ func (r *ReactorV2) resolve(addresses []p2p.NodeAddress) []protop2p.PexAddress { |
|
|
// handleMessage handles an Envelope sent from a peer on a specific p2p Channel.
|
|
|
// handleMessage handles an Envelope sent from a peer on a specific p2p Channel.
|
|
|
// It will handle errors and any possible panics gracefully. A caller can handle
|
|
|
// It will handle errors and any possible panics gracefully. A caller can handle
|
|
|
// any error returned by sending a PeerError on the respective channel.
|
|
|
// any error returned by sending a PeerError on the respective channel.
|
|
|
func (r *ReactorV2) handleMessage(chID p2p.ChannelID, envelope p2p.Envelope) (err error) { |
|
|
|
|
|
|
|
|
func (r *Reactor) handleMessage(chID p2p.ChannelID, envelope p2p.Envelope) (err error) { |
|
|
defer func() { |
|
|
defer func() { |
|
|
if e := recover(); e != nil { |
|
|
if e := recover(); e != nil { |
|
|
err = fmt.Errorf("panic in processing message: %v", e) |
|
|
err = fmt.Errorf("panic in processing message: %v", e) |
|
@ -407,7 +402,7 @@ func (r *ReactorV2) handleMessage(chID p2p.ChannelID, envelope p2p.Envelope) (er |
|
|
|
|
|
|
|
|
// processPeerUpdate processes a PeerUpdate. For added peers, PeerStatusUp, we
|
|
|
// processPeerUpdate processes a PeerUpdate. For added peers, PeerStatusUp, we
|
|
|
// send a request for addresses.
|
|
|
// send a request for addresses.
|
|
|
func (r *ReactorV2) processPeerUpdate(peerUpdate p2p.PeerUpdate) { |
|
|
|
|
|
|
|
|
func (r *Reactor) processPeerUpdate(peerUpdate p2p.PeerUpdate) { |
|
|
r.Logger.Debug("received PEX peer update", "peer", peerUpdate.NodeID, "status", peerUpdate.Status) |
|
|
r.Logger.Debug("received PEX peer update", "peer", peerUpdate.NodeID, "status", peerUpdate.Status) |
|
|
|
|
|
|
|
|
r.mtx.Lock() |
|
|
r.mtx.Lock() |
|
@ -424,7 +419,7 @@ func (r *ReactorV2) processPeerUpdate(peerUpdate p2p.PeerUpdate) { |
|
|
} |
|
|
} |
|
|
} |
|
|
} |
|
|
|
|
|
|
|
|
func (r *ReactorV2) waitUntilNextRequest() <-chan time.Time { |
|
|
|
|
|
|
|
|
func (r *Reactor) waitUntilNextRequest() <-chan time.Time { |
|
|
return time.After(time.Until(r.nextRequestTime)) |
|
|
return time.After(time.Until(r.nextRequestTime)) |
|
|
} |
|
|
} |
|
|
|
|
|
|
|
@ -432,7 +427,7 @@ func (r *ReactorV2) waitUntilNextRequest() <-chan time.Time { |
|
|
// peer a request for more peer addresses. The function then moves the
|
|
|
// peer a request for more peer addresses. The function then moves the
|
|
|
// peer into the requestsSent bucket and calculates when the next request
|
|
|
// peer into the requestsSent bucket and calculates when the next request
|
|
|
// time should be
|
|
|
// time should be
|
|
|
func (r *ReactorV2) sendRequestForPeers() { |
|
|
|
|
|
|
|
|
func (r *Reactor) sendRequestForPeers() { |
|
|
r.mtx.Lock() |
|
|
r.mtx.Lock() |
|
|
defer r.mtx.Unlock() |
|
|
defer r.mtx.Unlock() |
|
|
if len(r.availablePeers) == 0 { |
|
|
if len(r.availablePeers) == 0 { |
|
@ -480,7 +475,7 @@ func (r *ReactorV2) sendRequestForPeers() { |
|
|
// new nodes will plummet to a very small number, meaning the interval expands
|
|
|
// new nodes will plummet to a very small number, meaning the interval expands
|
|
|
// to its upper bound.
|
|
|
// to its upper bound.
|
|
|
// CONTRACT: Must use a write lock as nextRequestTime is updated
|
|
|
// CONTRACT: Must use a write lock as nextRequestTime is updated
|
|
|
func (r *ReactorV2) calculateNextRequestTime() { |
|
|
|
|
|
|
|
|
func (r *Reactor) calculateNextRequestTime() { |
|
|
// check if the peer store is full. If so then there is no need
|
|
|
// check if the peer store is full. If so then there is no need
|
|
|
// to send peer requests too often
|
|
|
// to send peer requests too often
|
|
|
if ratio := r.peerManager.PeerRatio(); ratio >= 0.95 { |
|
|
if ratio := r.peerManager.PeerRatio(); ratio >= 0.95 { |
|
@ -516,7 +511,7 @@ func (r *ReactorV2) calculateNextRequestTime() { |
|
|
r.nextRequestTime = time.Now().Add(baseTime * time.Duration(r.discoveryRatio)) |
|
|
r.nextRequestTime = time.Now().Add(baseTime * time.Duration(r.discoveryRatio)) |
|
|
} |
|
|
} |
|
|
|
|
|
|
|
|
func (r *ReactorV2) markPeerRequest(peer types.NodeID) error { |
|
|
|
|
|
|
|
|
func (r *Reactor) markPeerRequest(peer types.NodeID) error { |
|
|
r.mtx.Lock() |
|
|
r.mtx.Lock() |
|
|
defer r.mtx.Unlock() |
|
|
defer r.mtx.Unlock() |
|
|
if lastRequestTime, ok := r.lastReceivedRequests[peer]; ok { |
|
|
if lastRequestTime, ok := r.lastReceivedRequests[peer]; ok { |
|
@ -529,7 +524,7 @@ func (r *ReactorV2) markPeerRequest(peer types.NodeID) error { |
|
|
return nil |
|
|
return nil |
|
|
} |
|
|
} |
|
|
|
|
|
|
|
|
func (r *ReactorV2) markPeerResponse(peer types.NodeID) error { |
|
|
|
|
|
|
|
|
func (r *Reactor) markPeerResponse(peer types.NodeID) error { |
|
|
r.mtx.Lock() |
|
|
r.mtx.Lock() |
|
|
defer r.mtx.Unlock() |
|
|
defer r.mtx.Unlock() |
|
|
// check if a request to this peer was sent
|
|
|
// check if a request to this peer was sent
|
|
@ -546,7 +541,7 @@ func (r *ReactorV2) markPeerResponse(peer types.NodeID) error { |
|
|
|
|
|
|
|
|
// all addresses must use a MCONN protocol for the peer to be considered part of the
|
|
|
// all addresses must use a MCONN protocol for the peer to be considered part of the
|
|
|
// legacy p2p pex system
|
|
|
// legacy p2p pex system
|
|
|
func (r *ReactorV2) isLegacyPeer(peer types.NodeID) bool { |
|
|
|
|
|
|
|
|
func (r *Reactor) isLegacyPeer(peer types.NodeID) bool { |
|
|
for _, addr := range r.peerManager.Addresses(peer) { |
|
|
for _, addr := range r.peerManager.Addresses(peer) { |
|
|
if addr.Protocol != p2p.MConnProtocol { |
|
|
if addr.Protocol != p2p.MConnProtocol { |
|
|
return false |
|
|
return false |
|
|