@ -2,7 +2,6 @@ package p2p
import (
import (
"context"
"context"
"errors"
"fmt"
"fmt"
"io"
"io"
"sync"
"sync"
@ -73,9 +72,9 @@ import (
// forever on a channel that has no consumer.
// forever on a channel that has no consumer.
type Router struct {
type Router struct {
* service . BaseService
* service . BaseService
logger log . Logger
transports map [ Protocol ] Transport
store * peerStore
logger log . Logger
transports map [ Protocol ] Transport
peerManager * peerManager
// FIXME: Consider using sync.Map.
// FIXME: Consider using sync.Map.
peerMtx sync . RWMutex
peerMtx sync . RWMutex
@ -88,10 +87,6 @@ type Router struct {
channelQueues map [ ChannelID ] queue
channelQueues map [ ChannelID ] queue
channelMessages map [ ChannelID ] proto . Message
channelMessages map [ ChannelID ] proto . Message
peerUpdatesCh chan PeerUpdate
peerUpdatesMtx sync . RWMutex
peerUpdatesSubs map [ * PeerUpdatesCh ] * PeerUpdatesCh // keyed by struct identity (address)
// stopCh is used to signal router shutdown, by closing the channel.
// stopCh is used to signal router shutdown, by closing the channel.
stopCh chan struct { }
stopCh chan struct { }
}
}
@ -105,18 +100,16 @@ func NewRouter(logger log.Logger, transports map[Protocol]Transport, peers []Pee
router := & Router {
router := & Router {
logger : logger ,
logger : logger ,
transports : transports ,
transports : transports ,
store : newPeerStore ( ) ,
peerManager : newPeerManager ( newPeerStore ( ) ) ,
stopCh : make ( chan struct { } ) ,
stopCh : make ( chan struct { } ) ,
channelQueues : map [ ChannelID ] queue { } ,
channelQueues : map [ ChannelID ] queue { } ,
channelMessages : map [ ChannelID ] proto . Message { } ,
channelMessages : map [ ChannelID ] proto . Message { } ,
peerQueues : map [ NodeID ] queue { } ,
peerQueues : map [ NodeID ] queue { } ,
peerUpdatesCh : make ( chan PeerUpdate ) ,
peerUpdatesSubs : map [ * PeerUpdatesCh ] * PeerUpdatesCh { } ,
}
}
router . BaseService = service . NewBaseService ( logger , "router" , router )
router . BaseService = service . NewBaseService ( logger , "router" , router )
for _ , address := range peers {
for _ , address := range peers {
if err := router . store . Add ( address ) ; err != nil {
if err := router . peerManager . Add ( address ) ; err != nil {
logger . Error ( "failed to add peer" , "address" , address , "err" , err )
logger . Error ( "failed to add peer" , "address" , address , "err" , err )
}
}
}
}
@ -259,26 +252,31 @@ func (r *Router) acceptPeers(transport Transport) {
continue
continue
}
}
peerID := conn . NodeInfo ( ) . NodeID
if r . store . Claim ( peerID ) == nil {
r . logger . Error ( "already connected to peer, rejecting connection" , "peer" , peerID )
_ = conn . Close ( )
continue
}
go func ( ) {
defer func ( ) {
_ = conn . Close ( )
} ( )
peerID := conn . NodeInfo ( ) . NodeID
if err := r . peerManager . Accepted ( peerID ) ; err != nil {
r . logger . Error ( "failed to accept connection" , "peer" , peerID , "err" , err )
return
}
queue := newFIFOQueue ( )
r . peerMtx . Lock ( )
r . peerQueues [ peerID ] = queue
r . peerMtx . Unlock ( )
queue := newFIFOQueue ( )
r . peerMtx . Lock ( )
r . peerQueues [ peerID ] = queue
r . peerMtx . Unlock ( )
r . peerManager . Ready ( peerID )
go func ( ) {
defer func ( ) {
defer func ( ) {
r . peerMtx . Lock ( )
r . peerMtx . Lock ( )
delete ( r . peerQueues , peerID )
delete ( r . peerQueues , peerID )
r . peerMtx . Unlock ( )
r . peerMtx . Unlock ( )
queue . close ( )
queue . close ( )
_ = conn . Close ( )
r . store . Return ( peerID )
if err := r . peerManager . Disconnected ( peerID ) ; err != nil {
r . logger . Error ( "failed to disconnect peer" , "peer" , peerID , "err" , err )
}
} ( )
} ( )
r . routePeer ( peerID , conn , queue )
r . routePeer ( peerID , conn , queue )
@ -295,8 +293,11 @@ func (r *Router) dialPeers() {
default :
default :
}
}
peer := r . store . Dispense ( )
if peer == nil {
peerID , address , err := r . peerManager . DialNext ( )
if err != nil {
r . logger . Error ( "failed to find next peer to dial" , "err" , err )
return
} else if peerID == "" {
r . logger . Debug ( "no eligible peers, sleeping" )
r . logger . Debug ( "no eligible peers, sleeping" )
select {
select {
case <- time . After ( time . Second ) :
case <- time . After ( time . Second ) :
@ -307,63 +308,75 @@ func (r *Router) dialPeers() {
}
}
go func ( ) {
go func ( ) {
defer r . store . Return ( peer . ID )
conn , err := r . dialPeer ( peer )
conn , err := r . dialPeer ( address )
if err != nil {
if err != nil {
r . logger . Error ( "failed to dial peer, will retry" , "peer" , peer . ID )
r . logger . Error ( "failed to dial peer, will retry" , "peer" , peerID )
if err = r . peerManager . DialFailed ( peerID , address ) ; err != nil {
r . logger . Error ( "failed to report dial failure" , "peer" , peerID , "err" , err )
}
return
return
}
}
defer conn . Close ( )
defer conn . Close ( )
if err = r . peerManager . Dialed ( peerID , address ) ; err != nil {
r . logger . Error ( "failed to dial peer" , "peer" , peerID , "err" , err )
return
}
queue := newFIFOQueue ( )
queue := newFIFOQueue ( )
defer queue . close ( )
r . peerMtx . Lock ( )
r . peerMtx . Lock ( )
r . peerQueues [ peer . ID ] = queue
r . peerQueues [ peerID ] = queue
r . peerMtx . Unlock ( )
r . peerMtx . Unlock ( )
r . peerManager . Ready ( peerID )
defer func ( ) {
defer func ( ) {
r . peerMtx . Lock ( )
r . peerMtx . Lock ( )
delete ( r . peerQueues , peer . ID )
delete ( r . peerQueues , peerID )
r . peerMtx . Unlock ( )
r . peerMtx . Unlock ( )
queue . close ( )
if err := r . peerManager . Disconnected ( peerID ) ; err != nil {
r . logger . Error ( "failed to disconnect peer" , "peer" , peerID , "err" , err )
}
} ( )
} ( )
r . routePeer ( peer . ID , conn , queue )
r . routePeer ( peerID , conn , queue )
} ( )
} ( )
}
}
}
}
// dialPeer attempts to connect to a peer.
// dialPeer attempts to connect to a peer.
func ( r * Router ) dialPeer ( peer * peerInfo ) ( Connection , error ) {
func ( r * Router ) dialPeer ( address PeerAddress ) ( Connection , error ) {
ctx := context . Background ( )
ctx := context . Background ( )
for _ , address := range peer . Addresses {
resolveCtx , cancel := context . WithTimeout ( ctx , 5 * time . Second )
defer cancel ( )
r . logger . Info ( "resolving peer address" , "peer" , peer . ID , "address" , address )
endpoints , err := address . Resolve ( resolveCtx )
if err != nil {
r . logger . Error ( "failed to resolve address" , "address" , address , "err" , err )
resolveCtx , cancel := context . WithTimeout ( ctx , 5 * time . Second )
defer cancel ( )
r . logger . Info ( "resolving peer address" , "address" , address )
endpoints , err := address . Resolve ( resolveCtx )
if err != nil {
return nil , fmt . Errorf ( "failed to resolve address %q: %w" , address , err )
}
for _ , endpoint := range endpoints {
t , ok := r . transports [ endpoint . Protocol ]
if ! ok {
r . logger . Error ( "no transport found for protocol" , "protocol" , endpoint . Protocol )
continue
continue
}
}
for _ , endpoint := range endpoints {
t , ok := r . transports [ endpoint . Protocol ]
if ! ok {
r . logger . Error ( "no transport found for protocol" , "protocol" , endpoint . Protocol )
continue
}
dialCtx , cancel := context . WithTimeout ( ctx , 5 * time . Second )
defer cancel ( )
conn , err := t . Dial ( dialCtx , endpoint )
if err != nil {
r . logger . Error ( "failed to dial endpoint" , "endpoint" , endpoint )
} else {
r . logger . Info ( "connected to peer" , "peer" , peer . ID , "endpoint" , endpoint )
return conn , nil
}
dialCtx , cancel := context . WithTimeout ( ctx , 5 * time . Second )
defer cancel ( )
conn , err := t . Dial ( dialCtx , endpoint )
if err != nil {
r . logger . Error ( "failed to dial endpoint" , "endpoint" , endpoint )
} else {
r . logger . Info ( "connected to peer" , "peer" , address . NodeID ( ) , "endpoint" , endpoint )
return conn , nil
}
}
}
}
return nil , errors . New ( "failed to connect to peer" )
return nil , fmt . Errorf ( "failed to connect to peer via %q" , address )
}
}
// routePeer routes inbound messages from a peer to channels, and also sends
// routePeer routes inbound messages from a peer to channels, and also sends
@ -372,18 +385,7 @@ func (r *Router) dialPeer(peer *peerInfo) (Connection, error) {
// sendPeer() goroutines. It blocks until the peer is done, e.g. when the
// sendPeer() goroutines. It blocks until the peer is done, e.g. when the
// connection or queue is closed.
// connection or queue is closed.
func ( r * Router ) routePeer ( peerID NodeID , conn Connection , sendQueue queue ) {
func ( r * Router ) routePeer ( peerID NodeID , conn Connection , sendQueue queue ) {
// FIXME: Peer updates should probably be handled by the peer store.
r . peerUpdatesCh <- PeerUpdate {
PeerID : peerID ,
Status : PeerStatusUp ,
}
defer func ( ) {
r . peerUpdatesCh <- PeerUpdate {
PeerID : peerID ,
Status : PeerStatusDown ,
}
} ( )
r . logger . Info ( "routing peer" , "peer" , peerID )
resultsCh := make ( chan error , 2 )
resultsCh := make ( chan error , 2 )
go func ( ) {
go func ( ) {
resultsCh <- r . receivePeer ( peerID , conn )
resultsCh <- r . receivePeer ( peerID , conn )
@ -479,64 +481,19 @@ func (r *Router) sendPeer(peerID NodeID, conn Connection, queue queue) error {
}
}
// SubscribePeerUpdates creates a new peer updates subscription. The caller must
// SubscribePeerUpdates creates a new peer updates subscription. The caller must
// consume the peer updates in a timely fashion, since delivery is guaranteed and
// will block peer connection/disconnection otherwise.
func ( r * Router ) SubscribePeerUpdates ( ) ( * PeerUpdatesCh , error ) {
// FIXME: We may want to use a size 1 buffer here. When the router
// broadcasts a peer update it has to loop over all of the
// subscriptions, and we want to avoid blocking and waiting for a
// context switch before continuing to the next subscription. This also
// prevents tail latencies from compounding across updates. We also want
// to make sure the subscribers are reasonably in sync, so it should be
// kept at 1. However, this should be benchmarked first.
peerUpdates := NewPeerUpdates ( make ( chan PeerUpdate ) )
r . peerUpdatesMtx . Lock ( )
r . peerUpdatesSubs [ peerUpdates ] = peerUpdates
r . peerUpdatesMtx . Unlock ( )
go func ( ) {
select {
case <- peerUpdates . Done ( ) :
r . peerUpdatesMtx . Lock ( )
delete ( r . peerUpdatesSubs , peerUpdates )
r . peerUpdatesMtx . Unlock ( )
case <- r . stopCh :
}
} ( )
return peerUpdates , nil
}
// broadcastPeerUpdates broadcasts peer updates received from the router
// to all subscriptions.
func ( r * Router ) broadcastPeerUpdates ( ) {
for {
select {
case peerUpdate := <- r . peerUpdatesCh :
subs := [ ] * PeerUpdatesCh { }
r . peerUpdatesMtx . RLock ( )
for _ , sub := range r . peerUpdatesSubs {
subs = append ( subs , sub )
}
r . peerUpdatesMtx . RUnlock ( )
for _ , sub := range subs {
select {
case sub . updatesCh <- peerUpdate :
case <- sub . doneCh :
case <- r . stopCh :
return
}
}
case <- r . stopCh :
return
}
}
// consume the peer updates in a timely fashion and close the subscription when
// done, since delivery is guaranteed and will block peer
// connection/disconnection otherwise.
//
// FIXME: Consider having callers just use peerManager.Subscribe() directly, if
// we export peerManager and make it an injected dependency (which we probably
// should).
func ( r * Router ) SubscribePeerUpdates ( ) * PeerUpdatesCh {
return r . peerManager . Subscribe ( )
}
}
// OnStart implements service.Service.
// OnStart implements service.Service.
func ( r * Router ) OnStart ( ) error {
func ( r * Router ) OnStart ( ) error {
go r . broadcastPeerUpdates ( )
go r . dialPeers ( )
go r . dialPeers ( )
for _ , transport := range r . transports {
for _ , transport := range r . transports {
go r . acceptPeers ( transport )
go r . acceptPeers ( transport )