diff --git a/p2p/channel.go b/p2p/channel.go index 26808afc7..b2bcd156e 100644 --- a/p2p/channel.go +++ b/p2p/channel.go @@ -15,6 +15,16 @@ type Envelope struct { To NodeID // Message receiver, or empty for inbound messages. Broadcast bool // Send message to all connected peers, ignoring To. Message proto.Message // Payload. + + // For internal use in the Router. + channelID ChannelID +} + +// Strip strips internal information from the envelope. Primarily used for +// testing, such that returned envelopes can be compared with literals. +func (e Envelope) Strip() Envelope { + e.channelID = 0 + return e } // Channel is a bidirectional channel for Protobuf message exchange with peers. diff --git a/p2p/peer.go b/p2p/peer.go index 3d9772622..a9839ddfe 100644 --- a/p2p/peer.go +++ b/p2p/peer.go @@ -1,20 +1,128 @@ package p2p import ( + "context" + "errors" "fmt" "io" "net" + "net/url" "runtime/debug" + "strconv" "sync" "time" "github.com/tendermint/tendermint/libs/cmap" "github.com/tendermint/tendermint/libs/log" "github.com/tendermint/tendermint/libs/service" - tmconn "github.com/tendermint/tendermint/p2p/conn" ) +// PeerAddress is a peer address URL. +type PeerAddress struct { + *url.URL +} + +// ParsePeerAddress parses a peer address URL into a PeerAddress. +func ParsePeerAddress(address string) (PeerAddress, error) { + u, err := url.Parse(address) + if err != nil || u == nil { + return PeerAddress{}, fmt.Errorf("unable to parse peer address %q: %w", address, err) + } + if u.Scheme == "" { + u.Scheme = string(defaultProtocol) + } + pa := PeerAddress{URL: u} + if err = pa.Validate(); err != nil { + return PeerAddress{}, err + } + return pa, nil +} + +// NodeID returns the address node ID. +func (a PeerAddress) NodeID() NodeID { + return NodeID(a.User.Username()) +} + +// Resolve resolves a PeerAddress into a set of Endpoints, by expanding +// out a DNS name in Host to its IP addresses. Field mapping: +// +// Scheme → Endpoint.Protocol +// Host → Endpoint.IP +// User → Endpoint.PeerID +// Port → Endpoint.Port +// Path+Query+Fragment,Opaque → Endpoint.Path +// +func (a PeerAddress) Resolve(ctx context.Context) ([]Endpoint, error) { + ips, err := net.DefaultResolver.LookupIP(ctx, "ip", a.Host) + if err != nil { + return nil, err + } + port, err := a.parsePort() + if err != nil { + return nil, err + } + + path := a.Path + if a.RawPath != "" { + path = a.RawPath + } + if a.Opaque != "" { // used for e.g. "about:blank" style URLs + path = a.Opaque + } + if a.RawQuery != "" { + path += "?" + a.RawQuery + } + if a.RawFragment != "" { + path += "#" + a.RawFragment + } + + endpoints := make([]Endpoint, len(ips)) + for i, ip := range ips { + endpoints[i] = Endpoint{ + PeerID: a.NodeID(), + Protocol: Protocol(a.Scheme), + IP: ip, + Port: port, + Path: path, + } + } + return endpoints, nil +} + +// Validates validates a PeerAddress. +func (a PeerAddress) Validate() error { + if a.Scheme == "" { + return errors.New("no protocol") + } + if id := a.User.Username(); id == "" { + return errors.New("no peer ID") + } else if err := NodeID(id).Validate(); err != nil { + return fmt.Errorf("invalid peer ID: %w", err) + } + if a.Hostname() == "" && len(a.Query()) == 0 && a.Opaque == "" { + return errors.New("no host or path given") + } + if port, err := a.parsePort(); err != nil { + return err + } else if port > 0 && a.Hostname() == "" { + return errors.New("cannot specify port without host") + } + return nil +} + +// parsePort returns the port number as a uint16. +func (a PeerAddress) parsePort() (uint16, error) { + if portString := a.Port(); portString != "" { + port64, err := strconv.ParseUint(portString, 10, 16) + if err != nil { + return 0, fmt.Errorf("invalid port %q: %w", portString, err) + } + return uint16(port64), nil + } + return 0, nil +} + // PeerStatus specifies peer statuses. type PeerStatus string @@ -106,6 +214,126 @@ type PeerUpdate struct { Status PeerStatus } +// peerStore manages information about peers. It is currently a bare-bones +// in-memory store of peer addresses, and will be fleshed out later. +// +// The main function of peerStore is currently to dispense peers to connect to +// (via peerStore.Dispense), giving the caller exclusive "ownership" of that +// peer until the peer is returned (via peerStore.Return). This is used to +// schedule and synchronize peer dialing and accepting in the Router, e.g. +// making sure we only have a single connection (in either direction) to peers. +type peerStore struct { + mtx sync.Mutex + peers map[NodeID]*peerInfo + claimed map[NodeID]bool +} + +// newPeerStore creates a new peer store. +func newPeerStore() *peerStore { + return &peerStore{ + peers: map[NodeID]*peerInfo{}, + claimed: map[NodeID]bool{}, + } +} + +// Add adds a peer to the store, given as an address. +func (s *peerStore) Add(address PeerAddress) error { + if err := address.Validate(); err != nil { + return err + } + peerID := address.NodeID() + + s.mtx.Lock() + defer s.mtx.Unlock() + + peer, ok := s.peers[peerID] + if !ok { + peer = newStorePeer(peerID) + s.peers[peerID] = peer + } else if s.claimed[peerID] { + // FIXME: We need to handle modifications of claimed peers somehow. + return fmt.Errorf("peer %q is claimed", peerID) + } + peer.AddAddress(address) + return nil +} + +// Claim claims a peer. The caller has exclusive ownership of the peer, and must +// return it by calling Return(). Returns nil if the peer could not be claimed. +// If the peer is not known to the store, it is registered and claimed. +func (s *peerStore) Claim(id NodeID) *peerInfo { + s.mtx.Lock() + defer s.mtx.Unlock() + if s.claimed[id] { + return nil + } + peer, ok := s.peers[id] + if !ok { + peer = newStorePeer(id) + s.peers[id] = peer + } + s.claimed[id] = true + return peer +} + +// Dispense finds an appropriate peer to contact and claims it. The caller has +// exclusive ownership of the peer, and must return it by calling Return(). The +// peer will not be dispensed again until returned. +// +// Returns nil if no appropriate peers are available. +func (s *peerStore) Dispense() *peerInfo { + s.mtx.Lock() + defer s.mtx.Unlock() + for key, peer := range s.peers { + switch { + case len(peer.Addresses) == 0: + case s.claimed[key]: + default: + s.claimed[key] = true + return peer + } + } + return nil +} + +// Return returns a claimed peer, making it available for other +// callers to claim. +func (s *peerStore) Return(id NodeID) { + s.mtx.Lock() + defer s.mtx.Unlock() + delete(s.claimed, id) +} + +// peerInfo is a peer stored in the peerStore. +// +// FIXME: This should be renamed peer or something else once the old peer is +// removed. +type peerInfo struct { + ID NodeID + Addresses []PeerAddress +} + +// newStorePeer creates a new storePeer. +func newStorePeer(id NodeID) *peerInfo { + return &peerInfo{ + ID: id, + Addresses: []PeerAddress{}, + } +} + +// AddAddress adds an address to a peer, unless it already exists. It does not +// validate the address. +func (p *peerInfo) AddAddress(address PeerAddress) { + // We just do a linear search for now. + addressString := address.String() + for _, a := range p.Addresses { + if a.String() == addressString { + return + } + } + p.Addresses = append(p.Addresses, address) +} + // ============================================================================ // Types and business logic below may be deprecated. // diff --git a/p2p/queue.go b/p2p/queue.go new file mode 100644 index 000000000..873792f0e --- /dev/null +++ b/p2p/queue.go @@ -0,0 +1,59 @@ +package p2p + +import "sync" + +// queue does QoS scheduling for Envelopes, enqueueing and dequeueing according +// to some policy. Queues are used at contention points, i.e.: +// +// - Receiving inbound messages to a single channel from all peers. +// - Sending outbound messages to a single peer from all channels. +type queue interface { + // enqueue returns a channel for submitting envelopes. + enqueue() chan<- Envelope + + // dequeue returns a channel ordered according to some queueing policy. + dequeue() <-chan Envelope + + // close closes the queue. After this call enqueue() will block, so the + // caller must select on closed() as well to avoid blocking forever. The + // enqueue() and dequeue() channels will not be closed. + close() + + // closed returns a channel that's closed when the scheduler is closed. + closed() <-chan struct{} +} + +// fifoQueue is a simple unbuffered lossless queue that passes messages through +// in the order they were received, and blocks until message is received. +type fifoQueue struct { + queueCh chan Envelope + closeCh chan struct{} + closeOnce sync.Once +} + +var _ queue = (*fifoQueue)(nil) + +func newFIFOQueue() *fifoQueue { + return &fifoQueue{ + queueCh: make(chan Envelope), + closeCh: make(chan struct{}), + } +} + +func (q *fifoQueue) enqueue() chan<- Envelope { + return q.queueCh +} + +func (q *fifoQueue) dequeue() <-chan Envelope { + return q.queueCh +} + +func (q *fifoQueue) close() { + q.closeOnce.Do(func() { + close(q.closeCh) + }) +} + +func (q *fifoQueue) closed() <-chan struct{} { + return q.closeCh +} diff --git a/p2p/router.go b/p2p/router.go new file mode 100644 index 000000000..ec63447f2 --- /dev/null +++ b/p2p/router.go @@ -0,0 +1,568 @@ +package p2p + +import ( + "context" + "errors" + "fmt" + "io" + "sync" + "time" + + "github.com/gogo/protobuf/proto" + "github.com/tendermint/tendermint/libs/log" + "github.com/tendermint/tendermint/libs/service" +) + +// Router manages peer connections and routes messages between peers and reactor +// channels. This is an early prototype. +// +// Channels are registered via OpenChannel(). When called, we register an input +// message queue for the channel in channelQueues and spawn off a goroutine for +// Router.routeChannel(). This goroutine reads off outbound messages and puts +// them in the appropriate peer message queue, and processes peer errors which +// will close (and thus disconnect) the appriate peer queue. It runs until +// either the channel is closed by the caller or the router is stopped, at which +// point the input message queue is closed and removed. +// +// On startup, the router spawns off two primary goroutines that maintain +// connections to peers and run for the lifetime of the router: +// +// Router.dialPeers(): in a loop, asks the peerStore to dispense an +// eligible peer to connect to, and attempts to resolve and dial each +// address until successful. +// +// Router.acceptPeers(): in a loop, waits for the next inbound connection +// from a peer, and attempts to claim it in the peerStore. +// +// Once either an inbound or outbound connection has been made, an outbound +// message queue is registered in Router.peerQueues and a goroutine is spawned +// off for Router.routePeer() which will spawn off additional goroutines for +// Router.sendPeer() that sends outbound messages from the peer queue over the +// connection and for Router.receivePeer() that reads inbound messages from +// the connection and places them in the appropriate channel queue. When either +// goroutine exits, the connection and peer queue is closed, which will cause +// the other goroutines to close as well. +// +// The peerStore is used to coordinate peer connections, by only allowing a peer +// to be claimed (owned) by a single caller at a time (both for outbound and +// inbound connections). This is done either via peerStore.Dispense() which +// dispenses and claims an eligible peer to dial, or via peerStore.Claim() which +// attempts to claim a given peer for an inbound connection. Peers must be +// returned to the peerStore with peerStore.Return() to release the claim. Over +// time, the peerStore will also do peer scheduling and prioritization, e.g. +// ensuring we do exponential backoff on dial failures and connecting to +// more important peers first (such as persistent peers and validators). +// +// An additional goroutine Router.broadcastPeerUpdates() is also spawned off +// on startup, which consumes peer updates from Router.peerUpdatesCh (currently +// only connections and disconnections), and broadcasts them to all peer update +// subscriptions registered via SubscribePeerUpdates(). +// +// On router shutdown, we close Router.stopCh which will signal to all +// goroutines to terminate. This in turn will cause all pending channel/peer +// queues to close, and we wait for this as a signal that goroutines have ended. +// +// All message scheduling should be limited to the queue implementations used +// for channel queues and peer queues. All message sending throughout the router +// is blocking, and if any messages should be dropped or buffered this is the +// sole responsibility of the queue, such that we can limit this logic to a +// single place. There is currently only a FIFO queue implementation that always +// blocks and never drops messages, but this must be improved with other +// implementations. The only exception is that all message sending must also +// select on appropriate channel/queue/router closure signals, to avoid blocking +// forever on a channel that has no consumer. +type Router struct { + *service.BaseService + logger log.Logger + transports map[Protocol]Transport + store *peerStore + + // FIXME: Consider using sync.Map. + peerMtx sync.RWMutex + peerQueues map[NodeID]queue + + // FIXME: We don't strictly need to use a mutex for this if we seal the + // channels on router start. This depends on whether we want to allow + // dynamic channels in the future. + channelMtx sync.RWMutex + channelQueues map[ChannelID]queue + channelMessages map[ChannelID]proto.Message + + peerUpdatesCh chan PeerUpdate + peerUpdatesMtx sync.RWMutex + peerUpdatesSubs map[*PeerUpdatesCh]*PeerUpdatesCh // keyed by struct identity (address) + + // stopCh is used to signal router shutdown, by closing the channel. + stopCh chan struct{} +} + +// NewRouter creates a new Router, dialing the given peers. +// +// FIXME: providing protocol/transport maps is cumbersome in tests, we should +// consider adding Protocols() to the Transport interface instead and register +// protocol/transport mappings automatically on a first-come basis. +func NewRouter(logger log.Logger, transports map[Protocol]Transport, peers []PeerAddress) *Router { + router := &Router{ + logger: logger, + transports: transports, + store: newPeerStore(), + stopCh: make(chan struct{}), + channelQueues: map[ChannelID]queue{}, + channelMessages: map[ChannelID]proto.Message{}, + peerQueues: map[NodeID]queue{}, + peerUpdatesCh: make(chan PeerUpdate), + peerUpdatesSubs: map[*PeerUpdatesCh]*PeerUpdatesCh{}, + } + router.BaseService = service.NewBaseService(logger, "router", router) + + for _, address := range peers { + if err := router.store.Add(address); err != nil { + logger.Error("failed to add peer", "address", address, "err", err) + } + } + + return router +} + +// OpenChannel opens a new channel for the given message type. The caller must +// close the channel when done, and this must happen before the router stops. +func (r *Router) OpenChannel(id ChannelID, messageType proto.Message) (*Channel, error) { + // FIXME: NewChannel should take directional channels so we can pass + // queue.dequeue() instead of reaching inside for queue.queueCh. + queue := newFIFOQueue() + channel := NewChannel(id, messageType, queue.queueCh, make(chan Envelope), make(chan PeerError)) + + r.channelMtx.Lock() + defer r.channelMtx.Unlock() + + if _, ok := r.channelQueues[id]; ok { + return nil, fmt.Errorf("channel %v already exists", id) + } + r.channelQueues[id] = queue + r.channelMessages[id] = messageType + + go func() { + defer func() { + r.channelMtx.Lock() + delete(r.channelQueues, id) + delete(r.channelMessages, id) + r.channelMtx.Unlock() + queue.close() + }() + r.routeChannel(channel) + }() + + return channel, nil +} + +// routeChannel receives outbound messages and errors from a channel and routes +// them to the appropriate peer. It returns when either the channel is closed or +// the router is shutting down. +func (r *Router) routeChannel(channel *Channel) { + for { + select { + case envelope, ok := <-channel.outCh: + if !ok { + return + } + + // FIXME: This is a bit unergonomic, maybe it'd be better for Wrap() + // to return a wrapped copy. + if _, ok := channel.messageType.(Wrapper); ok { + wrapper := proto.Clone(channel.messageType) + if err := wrapper.(Wrapper).Wrap(envelope.Message); err != nil { + r.Logger.Error("failed to wrap message", "err", err) + continue + } + envelope.Message = wrapper + } + envelope.channelID = channel.id + + if envelope.Broadcast { + r.peerMtx.RLock() + peerQueues := make(map[NodeID]queue, len(r.peerQueues)) + for peerID, peerQueue := range r.peerQueues { + peerQueues[peerID] = peerQueue + } + r.peerMtx.RUnlock() + + for peerID, peerQueue := range peerQueues { + e := envelope + e.Broadcast = false + e.To = peerID + select { + case peerQueue.enqueue() <- e: + case <-peerQueue.closed(): + case <-r.stopCh: + return + } + } + + } else { + r.peerMtx.RLock() + peerQueue, ok := r.peerQueues[envelope.To] + r.peerMtx.RUnlock() + if !ok { + r.logger.Error("dropping message for non-connected peer", + "peer", envelope.To, "channel", channel.id) + continue + } + + select { + case peerQueue.enqueue() <- envelope: + case <-peerQueue.closed(): + r.logger.Error("dropping message for non-connected peer", + "peer", envelope.To, "channel", channel.id) + case <-r.stopCh: + return + } + } + + case peerError, ok := <-channel.errCh: + if !ok { + return + } + // FIXME: We just disconnect the peer for now + r.logger.Error("peer error, disconnecting", "peer", peerError.PeerID, "err", peerError.Err) + r.peerMtx.RLock() + peerQueue, ok := r.peerQueues[peerError.PeerID] + r.peerMtx.RUnlock() + if ok { + peerQueue.close() + } + + case <-channel.Done(): + return + case <-r.stopCh: + return + } + } +} + +// acceptPeers accepts inbound connections from peers on the given transport. +func (r *Router) acceptPeers(transport Transport) { + for { + select { + case <-r.stopCh: + return + default: + } + + conn, err := transport.Accept(context.Background()) + switch err { + case nil: + case ErrTransportClosed{}, io.EOF: + r.logger.Info("transport closed; stopping accept routine", "transport", transport) + return + default: + r.logger.Error("failed to accept connection", "transport", transport, "err", err) + continue + } + + peerID := conn.NodeInfo().NodeID + if r.store.Claim(peerID) == nil { + r.logger.Error("already connected to peer, rejecting connection", "peer", peerID) + _ = conn.Close() + continue + } + + queue := newFIFOQueue() + r.peerMtx.Lock() + r.peerQueues[peerID] = queue + r.peerMtx.Unlock() + + go func() { + defer func() { + r.peerMtx.Lock() + delete(r.peerQueues, peerID) + r.peerMtx.Unlock() + queue.close() + _ = conn.Close() + r.store.Return(peerID) + }() + + r.routePeer(peerID, conn, queue) + }() + } +} + +// dialPeers maintains outbound connections to peers. +func (r *Router) dialPeers() { + for { + select { + case <-r.stopCh: + return + default: + } + + peer := r.store.Dispense() + if peer == nil { + r.logger.Debug("no eligible peers, sleeping") + select { + case <-time.After(time.Second): + continue + case <-r.stopCh: + return + } + } + + go func() { + defer r.store.Return(peer.ID) + conn, err := r.dialPeer(peer) + if err != nil { + r.logger.Error("failed to dial peer, will retry", "peer", peer.ID) + return + } + defer conn.Close() + + queue := newFIFOQueue() + defer queue.close() + r.peerMtx.Lock() + r.peerQueues[peer.ID] = queue + r.peerMtx.Unlock() + + defer func() { + r.peerMtx.Lock() + delete(r.peerQueues, peer.ID) + r.peerMtx.Unlock() + }() + + r.routePeer(peer.ID, conn, queue) + }() + } +} + +// dialPeer attempts to connect to a peer. +func (r *Router) dialPeer(peer *peerInfo) (Connection, error) { + ctx := context.Background() + + for _, address := range peer.Addresses { + resolveCtx, cancel := context.WithTimeout(ctx, 5*time.Second) + defer cancel() + r.logger.Info("resolving peer address", "peer", peer.ID, "address", address) + endpoints, err := address.Resolve(resolveCtx) + if err != nil { + r.logger.Error("failed to resolve address", "address", address, "err", err) + continue + } + + for _, endpoint := range endpoints { + t, ok := r.transports[endpoint.Protocol] + if !ok { + r.logger.Error("no transport found for protocol", "protocol", endpoint.Protocol) + continue + } + dialCtx, cancel := context.WithTimeout(ctx, 5*time.Second) + defer cancel() + conn, err := t.Dial(dialCtx, endpoint) + if err != nil { + r.logger.Error("failed to dial endpoint", "endpoint", endpoint) + } else { + r.logger.Info("connected to peer", "peer", peer.ID, "endpoint", endpoint) + return conn, nil + } + } + } + return nil, errors.New("failed to connect to peer") +} + +// routePeer routes inbound messages from a peer to channels, and also sends +// outbound queued messages to the peer. It will close the connection and send +// queue, using this as a signal to coordinate the internal receivePeer() and +// sendPeer() goroutines. It blocks until the peer is done, e.g. when the +// connection or queue is closed. +func (r *Router) routePeer(peerID NodeID, conn Connection, sendQueue queue) { + // FIXME: Peer updates should probably be handled by the peer store. + r.peerUpdatesCh <- PeerUpdate{ + PeerID: peerID, + Status: PeerStatusUp, + } + defer func() { + r.peerUpdatesCh <- PeerUpdate{ + PeerID: peerID, + Status: PeerStatusDown, + } + }() + + resultsCh := make(chan error, 2) + go func() { + resultsCh <- r.receivePeer(peerID, conn) + }() + go func() { + resultsCh <- r.sendPeer(peerID, conn, sendQueue) + }() + + err := <-resultsCh + _ = conn.Close() + sendQueue.close() + if e := <-resultsCh; err == nil { + // The first err was nil, so we update it with the second result, + // which may or may not be nil. + err = e + } + switch err { + case nil, io.EOF, ErrTransportClosed{}: + r.logger.Info("peer disconnected", "peer", peerID) + default: + r.logger.Error("peer failure", "peer", peerID, "err", err) + } +} + +// receivePeer receives inbound messages from a peer, deserializes them and +// passes them on to the appropriate channel. +func (r *Router) receivePeer(peerID NodeID, conn Connection) error { + for { + chID, bz, err := conn.ReceiveMessage() + if err != nil { + return err + } + + r.channelMtx.RLock() + queue, ok := r.channelQueues[ChannelID(chID)] + messageType := r.channelMessages[ChannelID(chID)] + r.channelMtx.RUnlock() + if !ok { + r.logger.Error("dropping message for unknown channel", "peer", peerID, "channel", chID) + continue + } + + msg := proto.Clone(messageType) + if err := proto.Unmarshal(bz, msg); err != nil { + r.logger.Error("message decoding failed, dropping message", "peer", peerID, "err", err) + continue + } + if wrapper, ok := msg.(Wrapper); ok { + msg, err = wrapper.Unwrap() + if err != nil { + r.logger.Error("failed to unwrap message", "err", err) + continue + } + } + + select { + // FIXME: ReceiveMessage() should return ChannelID. + case queue.enqueue() <- Envelope{channelID: ChannelID(chID), From: peerID, Message: msg}: + r.logger.Debug("received message", "peer", peerID, "message", msg) + case <-queue.closed(): + r.logger.Error("channel closed, dropping message", "peer", peerID, "channel", chID) + case <-r.stopCh: + return nil + } + } +} + +// sendPeer sends queued messages to a peer. +func (r *Router) sendPeer(peerID NodeID, conn Connection, queue queue) error { + for { + select { + case envelope := <-queue.dequeue(): + bz, err := proto.Marshal(envelope.Message) + if err != nil { + r.logger.Error("failed to marshal message", "peer", peerID, "err", err) + continue + } + + // FIXME: SendMessage() should take ChannelID. + _, err = conn.SendMessage(byte(envelope.channelID), bz) + if err != nil { + return err + } + r.logger.Debug("sent message", "peer", envelope.To, "message", envelope.Message) + + case <-queue.closed(): + return nil + + case <-r.stopCh: + return nil + } + } +} + +// SubscribePeerUpdates creates a new peer updates subscription. The caller must +// consume the peer updates in a timely fashion, since delivery is guaranteed and +// will block peer connection/disconnection otherwise. +func (r *Router) SubscribePeerUpdates() (*PeerUpdatesCh, error) { + // FIXME: We may want to use a size 1 buffer here. When the router + // broadcasts a peer update it has to loop over all of the + // subscriptions, and we want to avoid blocking and waiting for a + // context switch before continuing to the next subscription. This also + // prevents tail latencies from compounding across updates. We also want + // to make sure the subscribers are reasonably in sync, so it should be + // kept at 1. However, this should be benchmarked first. + peerUpdates := NewPeerUpdates(make(chan PeerUpdate)) + r.peerUpdatesMtx.Lock() + r.peerUpdatesSubs[peerUpdates] = peerUpdates + r.peerUpdatesMtx.Unlock() + + go func() { + select { + case <-peerUpdates.Done(): + r.peerUpdatesMtx.Lock() + delete(r.peerUpdatesSubs, peerUpdates) + r.peerUpdatesMtx.Unlock() + case <-r.stopCh: + } + }() + return peerUpdates, nil +} + +// broadcastPeerUpdates broadcasts peer updates received from the router +// to all subscriptions. +func (r *Router) broadcastPeerUpdates() { + for { + select { + case peerUpdate := <-r.peerUpdatesCh: + subs := []*PeerUpdatesCh{} + r.peerUpdatesMtx.RLock() + for _, sub := range r.peerUpdatesSubs { + subs = append(subs, sub) + } + r.peerUpdatesMtx.RUnlock() + + for _, sub := range subs { + select { + case sub.updatesCh <- peerUpdate: + case <-sub.doneCh: + case <-r.stopCh: + return + } + } + + case <-r.stopCh: + return + } + } +} + +// OnStart implements service.Service. +func (r *Router) OnStart() error { + go r.broadcastPeerUpdates() + go r.dialPeers() + for _, transport := range r.transports { + go r.acceptPeers(transport) + } + return nil +} + +// OnStop implements service.Service. +func (r *Router) OnStop() { + // Collect all active queues, so we can wait for them to close. + queues := []queue{} + r.channelMtx.RLock() + for _, q := range r.channelQueues { + queues = append(queues, q) + } + r.channelMtx.RUnlock() + r.peerMtx.RLock() + for _, q := range r.peerQueues { + queues = append(queues, q) + } + r.peerMtx.RUnlock() + + // Signal router shutdown, and wait for queues (and thus goroutines) + // to complete. + close(r.stopCh) + for _, q := range queues { + <-q.closed() + } +} diff --git a/p2p/router_test.go b/p2p/router_test.go new file mode 100644 index 000000000..042200909 --- /dev/null +++ b/p2p/router_test.go @@ -0,0 +1,117 @@ +package p2p_test + +import ( + "errors" + "testing" + + gogotypes "github.com/gogo/protobuf/types" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + + "github.com/tendermint/tendermint/libs/log" + "github.com/tendermint/tendermint/p2p" +) + +type TestMessage = gogotypes.StringValue + +func echoReactor(channel *p2p.Channel) { + for { + select { + case envelope := <-channel.In(): + channel.Out() <- p2p.Envelope{ + To: envelope.From, + Message: &TestMessage{Value: envelope.Message.(*TestMessage).Value}, + } + case <-channel.Done(): + return + } + } +} + +func TestRouter(t *testing.T) { + logger := log.TestingLogger() + network := p2p.NewMemoryNetwork(logger) + transport := network.GenerateTransport() + chID := p2p.ChannelID(1) + + // Start some other in-memory network nodes to communicate with, running + // a simple echo reactor that returns received messages. + peers := []p2p.PeerAddress{} + for i := 0; i < 3; i++ { + peerTransport := network.GenerateTransport() + peerRouter := p2p.NewRouter(logger.With("peerID", i), map[p2p.Protocol]p2p.Transport{ + p2p.MemoryProtocol: peerTransport, + }, nil) + peers = append(peers, peerTransport.Endpoints()[0].PeerAddress()) + + channel, err := peerRouter.OpenChannel(chID, &TestMessage{}) + require.NoError(t, err) + defer channel.Close() + go echoReactor(channel) + + err = peerRouter.Start() + require.NoError(t, err) + defer func() { require.NoError(t, peerRouter.Stop()) }() + } + + // Start the main router and connect it to the peers above. + router := p2p.NewRouter(logger, map[p2p.Protocol]p2p.Transport{ + p2p.MemoryProtocol: transport, + }, peers) + channel, err := router.OpenChannel(chID, &TestMessage{}) + require.NoError(t, err) + peerUpdates, err := router.SubscribePeerUpdates() + require.NoError(t, err) + + err = router.Start() + require.NoError(t, err) + defer func() { + channel.Close() + peerUpdates.Close() + require.NoError(t, router.Stop()) + }() + + // Wait for peers to come online, and ping them as they do. + for i := 0; i < len(peers); i++ { + peerUpdate := <-peerUpdates.Updates() + peerID := peerUpdate.PeerID + require.Equal(t, p2p.PeerUpdate{ + PeerID: peerID, + Status: p2p.PeerStatusUp, + }, peerUpdate) + + channel.Out() <- p2p.Envelope{To: peerID, Message: &TestMessage{Value: "hi!"}} + assert.Equal(t, p2p.Envelope{ + From: peerID, + Message: &TestMessage{Value: "hi!"}, + }, (<-channel.In()).Strip()) + } + + // We then submit an error for a peer, and watch it get disconnected. + channel.Error() <- p2p.PeerError{ + PeerID: peers[0].NodeID(), + Err: errors.New("test error"), + Severity: p2p.PeerErrorSeverityCritical, + } + peerUpdate := <-peerUpdates.Updates() + require.Equal(t, p2p.PeerUpdate{ + PeerID: peers[0].NodeID(), + Status: p2p.PeerStatusDown, + }, peerUpdate) + + // We now broadcast a message, which we should receive back from only two peers. + channel.Out() <- p2p.Envelope{ + Broadcast: true, + Message: &TestMessage{Value: "broadcast"}, + } + for i := 0; i < len(peers)-1; i++ { + envelope := <-channel.In() + require.NotEqual(t, peers[0].NodeID(), envelope.From) + require.Equal(t, &TestMessage{Value: "broadcast"}, envelope.Message) + } + select { + case envelope := <-channel.In(): + t.Errorf("unexpected message: %v", envelope) + default: + } +} diff --git a/p2p/transport.go b/p2p/transport.go index 8d49b9538..5e15444fd 100644 --- a/p2p/transport.go +++ b/p2p/transport.go @@ -11,9 +11,15 @@ import ( "github.com/tendermint/tendermint/p2p/conn" ) +const ( + defaultProtocol Protocol = MConnProtocol +) + // Transport is an arbitrary mechanism for exchanging bytes with a peer. type Transport interface { - // Accept waits for the next inbound connection on a listening endpoint. + // Accept waits for the next inbound connection on a listening endpoint. If + // this returns io.EOF or ErrTransportClosed the transport should be + // considered closed and further Accept() calls are futile. Accept(context.Context) (Connection, error) // Dial creates an outbound connection to an endpoint. @@ -60,21 +66,27 @@ type Endpoint struct { Port uint16 } -// String formats an endpoint as a URL string. -func (e Endpoint) String() string { - u := url.URL{Scheme: string(e.Protocol)} - if e.PeerID != "" { - u.User = url.User(string(e.PeerID)) +// PeerAddress converts the endpoint into a peer address URL. +func (e Endpoint) PeerAddress() PeerAddress { + u := &url.URL{ + Scheme: string(e.Protocol), + User: url.User(string(e.PeerID)), } - if len(e.IP) > 0 { + if e.IP != nil { u.Host = e.IP.String() if e.Port > 0 { u.Host = net.JoinHostPort(u.Host, fmt.Sprintf("%v", e.Port)) } - } else if e.Path != "" { + u.Path = e.Path + } else { u.Opaque = e.Path } - return u.String() + return PeerAddress{URL: u} +} + +// String formats an endpoint as a URL string. +func (e Endpoint) String() string { + return e.PeerAddress().URL.String() } // Validate validates an endpoint.