diff --git a/docs/architecture/adr-062-p2p-architecture.md b/docs/architecture/adr-062-p2p-architecture.md index 5fae7301d..dafdaf103 100644 --- a/docs/architecture/adr-062-p2p-architecture.md +++ b/docs/architecture/adr-062-p2p-architecture.md @@ -8,6 +8,8 @@ - 2020-11-16: Notes on recommended reactor implementation patterns, approve ADR (@erikgrinaker) +- 2021-02-04: Update with new P2P core and Transport API changes (@erikgrinaker). + ## Context In [ADR 061](adr-061-p2p-refactor-scope.md) we decided to refactor the peer-to-peer (P2P) networking stack. The first phase is to redesign and refactor the internal P2P architecture, while retaining protocol compatibility as far as possible. @@ -20,13 +22,15 @@ Several variations of the proposed design were considered, including e.g. callin There were also proposals to use LibP2P instead of maintaining our own P2P stack, which were rejected (for now) in [ADR 061](adr-061-p2p-refactor-scope.md). +The initial version of this ADR had a byte-oriented multi-stream transport API, but this had to be abandoned/postponed to maintain backwards-compatibility with the existing MConnection protocol which is message-oriented. See the rejected RFC in [tendermint/spec#227](https://github.com/tendermint/spec/pull/227) for details. + ## Decision -The P2P stack will be redesigned as a message-oriented architecture, primarily relying on Go channels for communication and scheduling. It will use IO stream transports to exchange raw bytes with individual peers, bidirectional peer-addressable channels to send and receive Protobuf messages, and a router to route messages between reactors and peers. Message passing is asynchronous with at-most-once delivery. +The P2P stack will be redesigned as a message-oriented architecture, primarily relying on Go channels for communication and scheduling. It will use a message-oriented transport to binary messages with individual peers, bidirectional peer-addressable channels to send and receive Protobuf messages, a router to route messages between reactors and peers, and a peer manager to manage peer lifecycle information. Message passing is asynchronous with at-most-once delivery. ## Detailed Design -This ADR is primarily concerned with the architecture and interfaces of the P2P stack, not implementation details. Separate ADRs may be submitted for individual components, since implementation may be non-trivial. The interfaces described here should therefore be considered a rough architecture outline, not a complete and final design. +This ADR is primarily concerned with the architecture and interfaces of the P2P stack, not implementation details. The interfaces described here should therefore be considered a rough architecture outline, not a complete and final design. Primary design objectives have been: @@ -39,10 +43,10 @@ Primary design objectives have been: The main abstractions in the new stack are: -* `peer`: A node in the network, uniquely identified by a `PeerID` and stored in a `peerStore`. -* `Transport`: An arbitrary mechanism to exchange bytes with a peer using IO `Stream`s across a `Connection`. -* `Channel`: A bidirectional channel to asynchronously exchange Protobuf messages with peers addressed with `PeerID`. +* `Transport`: An arbitrary mechanism to exchange binary messages with a peer across a `Connection`. +* `Channel`: A bidirectional channel to asynchronously exchange Protobuf messages with peers using node ID addressing. * `Router`: Maintains transport connections to relevant peers and routes channel messages. +* `PeerManager`: Manages peer lifecycle information, e.g. deciding which peers to dial and when, using a `peerStore` for storage. * Reactor: A design pattern loosely defined as "something which listens on a channel and reacts to messages". These abstractions are illustrated in the following diagram (representing the internals of node A) and described in detail below. @@ -51,33 +55,42 @@ These abstractions are illustrated in the following diagram (representing the in ### Transports -Transports are arbitrary mechanisms for exchanging raw bytes with a peer. For example, a gRPC transport would connect to a peer over TCP/IP and send data using the gRPC protocol, while an in-memory transport might communicate with a peer running in another goroutine using internal byte buffers. Note that transports don't have a notion of a `peer` as such - instead, they communicate with an arbitrary endpoint address (e.g. IP address and port number), to decouple them from the rest of the P2P stack. +Transports are arbitrary mechanisms for exchanging binary messages with a peer. For example, a gRPC transport would connect to a peer over TCP/IP and send data using the gRPC protocol, while an in-memory transport might communicate with a peer running in another goroutine using internal Go channels. Note that transports don't have a notion of a "peer" or "node" as such - instead, they establish connections between arbitrary endpoint addresses (e.g. IP address and port number), to decouple them from the rest of the P2P stack. Transports must satisfy the following requirements: * Be connection-oriented, and support both listening for inbound connections and making outbound connections using endpoint addresses. -* Support multiple logical IO streams within a single connection, to take full advantage of protocols with native stream support. For example, QUIC supports multiple independent streams, while HTTP/2 and MConn multiplex logical streams onto a single TCP connection. +* Support sending binary messages with distinct channel IDs (although channels and channel IDs are a higher-level application protocol concept explained in the Router section, they are threaded through the transport layer as well for backwards compatibilty with the existing MConnection protocol). -* Provide the public key of the peer, and possibly encrypt or sign the traffic as appropriate. This should be compared with known data (e.g. the peer ID) to authenticate the peer and avoid man-in-the-middle attacks. +* Exchange the MConnection `NodeInfo` and public key via a node handshake, and possibly encrypt or sign the traffic as appropriate. -The initial transport implementation will be a port of the current MConn protocol currently used by Tendermint, and should be backwards-compatible at the wire level as far as possible. This will be followed by an in-memory transport for testing, and a QUIC transport that may eventually replace MConn. +The initial transport is a port of the current MConnection protocol currently used by Tendermint, and should be backwards-compatible at the wire level. An in-memory transport for testing has also been implemented. There are plans to explore a QUIC transport that may replace the MConnection protocol. -The `Transport` interface is: +The `Transport` interface is as follows: ```go -// Transport is an arbitrary mechanism for exchanging bytes with a peer. +// Transport is a connection-oriented mechanism for exchanging data with a peer. type Transport interface { - // Accept waits for the next inbound connection on a listening endpoint. - Accept(context.Context) (Connection, error) + // Protocols returns the protocols supported by the transport. The Router + // uses this to pick a transport for an Endpoint. + Protocols() []Protocol + + // Endpoints returns the local endpoints the transport is listening on, if any. + // How to listen is transport-dependent, e.g. MConnTransport uses Listen() while + // MemoryTransport starts listening via MemoryNetwork.CreateTransport(). + Endpoints() []Endpoint + + // Accept waits for the next inbound connection on a listening endpoint, blocking + // until either a connection is available or the transport is closed. On closure, + // io.EOF is returned and further Accept calls are futile. + Accept() (Connection, error) // Dial creates an outbound connection to an endpoint. Dial(context.Context, Endpoint) (Connection, error) - // Endpoints lists endpoints the transport is listening on. Any endpoint IP - // addresses do not need to be normalized in any way (e.g. 0.0.0.0 is - // valid), as they should be preprocessed before being advertised. - Endpoints() []Endpoint + // Close stops accepting new connections, but does not close active connections. + Close() error } ``` @@ -91,21 +104,24 @@ The `Endpoint` struct is: ```go // Endpoint represents a transport connection endpoint, either local or remote. +// +// Endpoints are not necessarily networked (see e.g. MemoryTransport) but all +// networked endpoints must use IP as the underlying transport protocol to allow +// e.g. IP address filtering. Either IP or Path (or both) must be set. type Endpoint struct { - // Protocol specifies the transport protocol, used by the router to pick a - // transport for an endpoint. + // Protocol specifies the transport protocol. Protocol Protocol - // Path is an optional, arbitrary transport-specific path or identifier. - Path string - // IP is an IP address (v4 or v6) to connect to. If set, this defines the // endpoint as a networked endpoint. IP net.IP - // Port is a network port (either TCP or UDP). If not set, a default port - // may be used depending on the protocol. + // Port is a network port (either TCP or UDP). If 0, a default port may be + // used depending on the protocol. Port uint16 + + // Path is an optional transport-specific path or identifier. + Path string } // Protocol identifies a transport protocol. @@ -114,21 +130,30 @@ type Protocol string Endpoints are arbitrary transport-specific addresses, but if they are networked they must use IP addresses and thus rely on IP as a fundamental packet routing protocol. This enables policies for address discovery, advertisement, and exchange - for example, a private `192.168.0.0/24` IP address should only be advertised to peers on that IP network, while the public address `8.8.8.8` may be advertised to all peers. Similarly, any port numbers if given must represent TCP and/or UDP port numbers, in order to use [UPnP](https://en.wikipedia.org/wiki/Universal_Plug_and_Play) to autoconfigure e.g. NAT gateways. -Non-networked endpoints (without an IP address) are considered local, and will only be advertised to other peers connecting via the same protocol. For example, an in-memory transport used for testing might have `Endpoint{Protocol: "memory", Path: "foo"}` as an address for the node "foo", and this should only be advertised to other nodes using `Protocol: "memory"`. +Non-networked endpoints (without an IP address) are considered local, and will only be advertised to other peers connecting via the same protocol. For example, the in-memory transport used for testing uses `Endpoint{Protocol: "memory", Path: "foo"}` as an address for the node "foo", and this should only be advertised to other nodes using `Protocol: "memory"`. -#### Connections and Streams +#### Connections -A connection represents an established transport connection between two endpoints (and thus two nodes), which can be used to exchange bytes via logically distinct IO streams. Connections are set up either via `Transport.Dial()` (outbound) or `Transport.Accept()` (inbound). The caller is responsible for verifying the remote peer's public key as returned by the connection, following the current MConn protocol behavior for now. +A connection represents an established transport connection between two endpoints (i.e. two nodes), which can be used to exchange binary messages with logical channel IDs (corresponding to the higher-level channel IDs used in the router). Connections are set up either via `Transport.Dial()` (outbound) or `Transport.Accept()` (inbound). -Data is exchanged over IO streams created with `Connection.Stream()`. These implement the standard Go `io.Reader` and `io.Writer` interfaces to read and write bytes. Transports are free to choose how to implement such streams, e.g. by taking advantage of native stream support in the underlying protocol or through multiplexing. +Once a connection is esablished, `Transport.Handshake()` must be called to perform a node handshake, exchanging node info and public keys to verify node identities. Node handshakes should not really be part of the transport layer (it's an application protocol concern), this exists for backwards-compatibility with the existing MConnection protocol which conflates the two. `NodeInfo` is part of the existing MConnection protocol, but does not appear to be documented in the specification -- refer to the Go codebase for details. -`Connection` and the related `Stream` interfaces are: +The `Connection` interface is shown below. It omits certain additions that are currently implemented for backwards compatibility with the legacy P2P stack and are planned to be removed before the final release. ```go // Connection represents an established connection between two endpoints. type Connection interface { - // Stream creates a new logically distinct IO stream within the connection. - Stream() (Stream, error) + // Handshake executes a node handshake with the remote peer. It must be + // called once the connection is established, and returns the remote peer's + // node info and public key. The caller is responsible for validation. + Handshake(context.Context, NodeInfo, crypto.PrivKey) (NodeInfo, crypto.PubKey, error) + + // ReceiveMessage returns the next message received on the connection, + // blocking until one is available. Returns io.EOF if closed. + ReceiveMessage() (ChannelID, []byte, error) + + // SendMessage sends a message on the connection. Returns io.EOF if closed. + SendMessage(ChannelID, []byte) error // LocalEndpoint returns the local endpoint for the connection. LocalEndpoint() Endpoint @@ -136,169 +161,181 @@ type Connection interface { // RemoteEndpoint returns the remote endpoint for the connection. RemoteEndpoint() Endpoint - // PubKey returns the public key of the remote peer. - PubKey() crypto.PubKey - // Close closes the connection. Close() error } +``` + +This ADR initially proposed a byte-oriented multi-stream connection API that follows more typical networking API conventions (using e.g. `io.Reader` and `io.Writer` interfaces which easily compose with other libraries). This would also allow moving the responsibility for message framing, node handshakes, and traffic scheduling to the common router instead of reimplementing this across transports, and would allow making better use of multi-stream protocols such as QUIC. However, this would require minor breaking changes to the MConnection protocol which were rejected, see [tendermint/spec#227](https://github.com/tendermint/spec/pull/227) for details. This should be revisited when starting work on a QUIC transport. + +### Peer Management + +Peers are other Tendermint nodes. Each peer is identified by a unique `NodeID` (tied to the node's private key). + +#### Peer Addresses + +Nodes have one or more `NodeAddress` addresses expressed as URLs that they can be reached at. Examples of node addresses might be e.g.: + +* `mconn://nodeid@host.domain.com:25567/path` +* `memory:nodeid` + +Addresses are resolved into one or more transport endpoints, e.g. by resolving DNS hostnames into IP addresses. Peers should always be expressed as address URLs rather than endpoints (which are a lower-level transport construct). + +```go +// NodeID is a hex-encoded crypto.Address. It must be lowercased +// (for uniqueness) and of length 40. +type NodeID string -// Stream represents a single logical IO stream within a connection. -type Stream interface { - io.Reader // Read([]byte) (int, error) - io.Writer // Write([]byte) (int, error) - io.Closer // Close() error +// NodeAddress is a node address URL. It differs from a transport Endpoint in +// that it contains the node's ID, and that the address hostname may be resolved +// into multiple IP addresses (and thus multiple endpoints). +// +// If the URL is opaque, i.e. of the form "scheme:opaque", then the opaque part +// is expected to contain a node ID. +type NodeAddress struct { + NodeID NodeID + Protocol Protocol + Hostname string + Port uint16 + Path string } + +// ParseNodeAddress parses a node address URL into a NodeAddress, normalizing +// and validating it. +func ParseNodeAddress(urlString string) (NodeAddress, error) + +// Resolve resolves a NodeAddress into a set of Endpoints, e.g. by expanding +// out a DNS hostname to IP addresses. +func (a NodeAddress) Resolve(ctx context.Context) ([]Endpoint, error) ``` -### Peers +#### Peer Manager + +The P2P stack needs to track a lot of internal state about peers, such as their addresses, connection state, priorities, availability, failures, retries, and so on. This responsibility has been separated out to a `PeerManager`, which track this state for the `Router` (but does not maintain the actual transport connections themselves, which is the router's responsibility). + +The `PeerManager` is a synchronous state machine, where all state transitions are serialized (implemented as synchronous method calls holding an exclusive mutex lock). Most peer state is intentionally kept internal, stored in a `peerStore` database that persists it as appropriate, and the external interfaces pass the minimum amount of information necessary in order to avoid shared state between router goroutines. This design significantly simplifies the model, making it much easier to reason about and test than if it was baked into the asynchronous ball of concurrency that the P2P networking core must necessarily be. As peer lifecycle events are expected to be relatively infrequent, this should not significantly impact performance either. -Peers are other Tendermint network nodes. Each peer is identified by a unique `PeerID`, and has a set of `PeerAddress` addresses expressed as URLs that they can be reached at. Examples of peer addresses might be e.g.: +The `Router` uses the `PeerManager` to request which peers to dial and evict, and reports in with peer lifecycle events such as connections, disconnections, and failures as they occur. The manager can reject these events (e.g. reject an inbound connection) by returning errors. This happens as follows: -* `mconn://b10c@host.domain.com:25567/path` -* `unix:///var/run/tendermint/peer.sock` -* `memory:testpeer` +* Outbound connections, via `Transport.Dial`: + * `DialNext()`: returns a peer address to dial, or blocks until one is available. + * `DialFailed()`: reports a peer dial failure. + * `Dialed()`: reports a peer dial success. + * `Ready()`: reports the peer as routed and ready. + * `Disconnected()`: reports a peer disconnection. -Addresses are resolved into one or more transport endpoints, e.g. by resolving DNS hostnames into IP addresses (which should be refreshed periodically). Peers should always be expressed as address URLs, and never as endpoints which are a lower-level construct. +* Inbound connections, via `Transport.Accept`: + * `Accepted()`: reports an inbound peer connection. + * `Ready()`: reports the peer as routed and ready. + * `Disconnected()`: reports a peer disconnection. + +* Evictions, via `Connection.Close`: + * `EvictNext()`: returns a peer to disconnect, or blocks until one is available. + * `Disconnected()`: reports a peer disconnection. + +These calls have the following interface: ```go -// PeerID is a unique peer ID, generally expressed in hex form. -type PeerID []byte +// DialNext returns a peer address to dial, blocking until one is available. +func (m *PeerManager) DialNext(ctx context.Context) (NodeAddress, error) -// PeerAddress is a peer address URL. The User field, if set, gives the -// hex-encoded remote PeerID, which should be verified with the remote peer's -// public key as returned by the connection. -type PeerAddress url.URL +// DialFailed reports a dial failure for the given address. +func (m *PeerManager) DialFailed(address NodeAddress) error -// Resolve resolves a PeerAddress into a set of Endpoints, typically by -// expanding out a DNS name in Host to its IP addresses. Field mapping: -// -// Scheme → Endpoint.Protocol -// Host → Endpoint.IP -// Port → Endpoint.Port -// Path+Query+Fragment,Opaque → Endpoint.Path -// -func (a PeerAddress) Resolve(ctx context.Context) []Endpoint { return nil } +// Dialed reports a successful outbound connection to the given address. +func (m *PeerManager) Dialed(address NodeAddress) error + +// Accepted reports a successful inbound connection from the given node. +func (m *PeerManager) Accepted(peerID NodeID) error + +// Ready reports the peer as fully routed and ready for use. +func (m *PeerManager) Ready(peerID NodeID) error + +// EvictNext returns a peer ID to disconnect, blocking until one is available. +func (m *PeerManager) EvictNext(ctx context.Context) (NodeID, error) + +// Disconnected reports a peer disconnection. +func (m *PeerManager) Disconnected(peerID NodeID) error ``` -The P2P stack needs to track a lot of internal information about peers, such as endpoints, status, priorities, and so on. This is done in an internal `peer` struct, which should not be exposed outside of the `p2p` package (e.g. to reactors) in order to avoid race conditions and lock contention - other packages should use `PeerID`. +Internally, the `PeerManager` uses a numeric peer score to prioritize peers, e.g. when deciding which peers to dial next. The scoring policy has not yet been implemented, but should take into account e.g. node configuration such a `persistent_peers`, uptime and connection failures, performance, and so on. The manager will also attempt to automatically upgrade to better-scored peers by evicting lower-scored peers when a better one becomes available (e.g. when a persistent peer comes back online after an outage). + +The `PeerManager` should also have an API for reporting peer behavior from reactors that affects its score (e.g. signing a block increases the score, double-voting decreases it or even bans the peer), but this has not yet been designed and implemented. -The `peer` struct might look like the following, but is intentionally underspecified and will depend on implementation requirements (for example, it will almost certainly have to track statistics about connection failures and retries): +Additionally, the `PeerManager` provides `PeerUpdates` subscriptions that will receive `PeerUpdate` events whenever significant peer state changes happen. Reactors can use these e.g. to know when peers are connected or disconnected, and take appropriate action. This is currently fairly minimal: ```go -// peer tracks internal status information about a peer. -type peer struct { - ID PeerID - Status PeerStatus - Priority PeerPriority - Endpoints map[PeerAddress][]Endpoint // Resolved endpoints by address. +// Subscribe subscribes to peer updates. The caller must consume the peer updates +// in a timely fashion and close the subscription when done, to avoid stalling the +// PeerManager as delivery is semi-synchronous, guaranteed, and ordered. +func (m *PeerManager) Subscribe() *PeerUpdates + +// PeerUpdate is a peer update event sent via PeerUpdates. +type PeerUpdate struct { + NodeID NodeID + Status PeerStatus } -// PeerStatus specifies peer statuses. +// PeerStatus is a peer status. type PeerStatus string const ( - PeerStatusNew = "new" // New peer which we haven't tried to contact yet. - PeerStatusUp = "up" // Peer which we have an active connection to. - PeerStatusDown = "down" // Peer which we're temporarily disconnected from. - PeerStatusRemoved = "removed" // Peer which has been removed. - PeerStatusBanned = "banned" // Peer which is banned for misbehavior. + PeerStatusUp PeerStatus = "up" // Connected and ready. + PeerStatusDown PeerStatus = "down" // Disconnected. ) -// PeerPriority specifies peer priorities. -type PeerPriority int +// PeerUpdates is a real-time peer update subscription. +type PeerUpdates struct { ... } -const ( - PeerPriorityNormal PeerPriority = iota + 1 - PeerPriorityValidator - PeerPriorityPersistent -) -``` +// Updates returns a channel for consuming peer updates. +func (pu *PeerUpdates) Updates() <-chan PeerUpdate -Peer information is stored in a `peerStore`, which may be persisted in an underlying database, and will replace the current address book either partially or in full. It is kept internal to avoid race conditions and tight coupling, and should at the very least contain basic CRUD functionality as outlined below, but will likely need additional functionality and is intentionally underspecified: - -```go -// peerStore contains information about peers, possibly persisted to disk. -type peerStore struct { - peers map[string]*peer // Entire set in memory, with PeerID.String() keys. - db dbm.DB // Database for persistence, if non-nil. -} - -func (p *peerStore) Delete(id PeerID) error { return nil } -func (p *peerStore) Get(id PeerID) (peer, bool) { return peer{}, false } -func (p *peerStore) List() []peer { return nil } -func (p *peerStore) Set(peer peer) error { return nil } +// Close closes the peer updates subscription. +func (pu *PeerUpdates) Close() ``` -Peer address detection, advertisement and exchange (including detection of externally-reachable addresses via e.g. NAT gateways) is out of scope for this ADR, but may be covered in a separate ADR. The current PEX reactor should probably be absorbed into the core P2P stack and protocol instead of running as a separate reactor, since this needs to mutate the core peer data structures and will thus be tightly coupled with the router. +The `PeerManager` will also be responsible for providing peer information to the PEX reactor that can be gossipped to other nodes. This requires an improved system for peer address detection and advertisement, that e.g. reliably detects peer and self addresses and only gossips private network addresses to other peers on the same network, but this system has not yet been fully designed and implemented. ### Channels -While low-level data exchange happens via transport IO streams, the high-level API is based on a bidirectional `Channel` that can send and receive Protobuf messages addressed by `PeerID`. A channel is identified by an arbitrary `ChannelID` identifier, and can exchange Protobuf messages of one specific type (since the type to unmarshal into must be known). Message delivery is asynchronous and at-most-once. +While low-level data exchange happens via the `Transport`, the high-level API is based on a bidirectional `Channel` that can send and receive Protobuf messages addressed by `NodeID`. A channel is identified by an arbitrary `ChannelID` identifier, and can exchange Protobuf messages of one specific type (since the type to unmarshal into must be predefined). Message delivery is asynchronous and at-most-once. -The channel can also be used to report peer errors, e.g. when receiving an invalid or malignant message. This may cause the peer to be disconnected or banned depending on the router's policy. +The channel can also be used to report peer errors, e.g. when receiving an invalid or malignant message. This may cause the peer to be disconnected or banned depending on `PeerManager` policy, but should probably be replaced by a broader peer behavior API that can also report good behavior. A `Channel` has this interface: ```go -// Channel is a bidirectional channel for Protobuf message exchange with peers. -type Channel struct { - // ID contains the channel ID. - ID ChannelID - - // messageType specifies the type of messages exchanged via the channel, and - // is used e.g. for automatic unmarshaling. - messageType proto.Message - - // In is a channel for receiving inbound messages. Envelope.From is always - // set. - In <-chan Envelope - - // Out is a channel for sending outbound messages. Envelope.To or Broadcast - // must be set, otherwise the message is discarded. - Out chan<- Envelope +// ChannelID is an arbitrary channel ID. +type ChannelID uint16 - // Error is a channel for reporting peer errors to the router, typically used - // when peers send an invalid or malignant message. - Error chan<- PeerError +// Channel is a bidirectional channel to exchange Protobuf messages with peers. +type Channel struct { + ID ChannelID // Channel ID. + In <-chan Envelope // Inbound messages (peers to reactors). + Out chan<- Envelope // outbound messages (reactors to peers) + Error chan<- PeerError // Peer error reporting. + messageType proto.Message // Channel's message type, for e.g. unmarshalling. } -// Close closes the channel, and is equivalent to close(Channel.Out). This will -// cause Channel.In to be closed when appropriate. The ID can then be reused. -func (c *Channel) Close() error { return nil } - -// ChannelID is an arbitrary channel ID. -type ChannelID uint16 +// Close closes the channel, also closing Out and Error. +func (c *Channel) Close() error // Envelope specifies the message receiver and sender. type Envelope struct { - From PeerID // Message sender, or empty for outbound messages. - To PeerID // Message receiver, or empty for inbound messages. - Broadcast bool // Send message to all connected peers, ignoring To. - Message proto.Message // Payload. + From NodeID // Sender (empty if outbound). + To NodeID // Receiver (empty if inbound). + Broadcast bool // Send to all connected peers, ignoring To. + Message proto.Message // Message payload. } -// PeerError is a peer error reported by a reactor via the Error channel. The -// severity may cause the peer to be disconnected or banned depending on policy. +// PeerError is a peer error reported via the Error channel. type PeerError struct { - PeerID PeerID + NodeID NodeID Err error - Severity PeerErrorSeverity } - -// PeerErrorSeverity determines the severity of a peer error. -type PeerErrorSeverity string - -const ( - PeerErrorSeverityLow PeerErrorSeverity = "low" // Mostly ignored. - PeerErrorSeverityHigh PeerErrorSeverity = "high" // May disconnect. - PeerErrorSeverityCritical PeerErrorSeverity = "critical" // Ban. -) ``` -A channel can reach any connected peer, and is implemented using transport streams against each individual peer, with an initial handshake to exchange the channel ID and any other metadata. The channel will automatically (un)marshal Protobuf to byte slices and use length-prefixed framing (the de facto standard for Protobuf streams) when writing them to the stream. - -Message scheduling and queueing is left as an implementation detail, and can use any number of algorithms such as FIFO, round-robin, priority queues, etc. Since message delivery is not guaranteed, both inbound and outbound messages may be dropped, buffered, or blocked as appropriate. +A channel can reach any connected peer, and will automatically (un)marshal the Protobuf messages. Message scheduling and queueing is a `Router` implementation concern, and can use any number of algorithms such as FIFO, round-robin, priority queues, etc. Since message delivery is not guaranteed, both inbound and outbound messages may be dropped, buffered, reordered, or blocked as appropriate. Since a channel can only exchange messages of a single type, it is often useful to use a wrapper message type with e.g. a Protobuf `oneof` field that specifies a set of inner message types that it can contain. The channel can automatically perform this (un)wrapping if the outer message type implements the `Wrapper` interface (see [Reactor Example](#reactor-example) for an example): @@ -308,6 +345,8 @@ Since a channel can only exchange messages of a single type, it is often useful // automatically (un)wrap passed messages using the container type, such that // the channel can transparently support multiple message types. type Wrapper interface { + proto.Message + // Wrap will take a message and wrap it in this one. Wrap(proto.Message) error @@ -318,41 +357,109 @@ type Wrapper interface { ### Routers -The router manages all P2P networking for a node, and is responsible for keeping track of network peers, maintaining transport connections, and routing channel messages. As such, it must do e.g. connection retries and backoff, message QoS scheduling and backpressure, peer quality assessments, and endpoint detection and advertisement. In addition, the router provides a mechanism to subscribe to peer updates (e.g. peers connecting or disconnecting), and handles reported peer errors from reactors. +The router exeutes P2P networking for a node, taking instructions from and reporting events to the `PeerManager`, maintaining transport connections to peers, and routing messages between channels and peers. -The implementation of the router is likely to be non-trivial, and is intentionally unspecified here. A separate ADR will likely be submitted for this. It is unclear whether message routing/scheduling and peer lifecycle management can be split into two separate components, or if these need to be tightly coupled. +Practically all concurrency in the P2P stack has been moved into the router and reactors, while as many other responsibilities as possible have been moved into separate components such as the `Transport` and `PeerManager` that can remain largely synchronous. Limiting concurrency to a single core component makes it much easier to reason about since there is only a single concurrency structure, while the remaining components can be serial, simple, and easily testable. -The `Router` API is as follows: +The `Router` has a very minimal API, since it is mostly driven by `PeerManager` and `Transport` events: ```go -// Router manages connections to peers and routes Protobuf messages between them -// and local reactors. It also provides peer status updates and error reporting. -type Router struct{} +// Router maintains peer transport connections and routes messages between +// peers and channels. +type Router struct { + // Some details have been omitted below. + + logger log.Logger + options RouterOptions + nodeInfo NodeInfo + privKey crypto.PrivKey + peerManager *PeerManager + transports []Transport + + peerMtx sync.RWMutex + peerQueues map[NodeID]queue + + channelMtx sync.RWMutex + channelQueues map[ChannelID]queue +} -// NewRouter creates a new router, using the given peer store to track peers. -// Transports must be pre-initialized to listen on appropriate endpoints. -func NewRouter(peerStore *peerStore, transports map[Protocol]Transport) *Router { return nil } +// OpenChannel opens a new channel for the given message type. The caller must +// close the channel when done, before stopping the Router. messageType is the +// type of message passed through the channel. +func (r *Router) OpenChannel(id ChannelID, messageType proto.Message) (*Channel, error) -// Channel opens a new channel with the given ID. messageType should be an empty -// Protobuf message of the type that will be passed through the channel. The -// message can implement Wrapper for automatic message (un)wrapping. -func (r *Router) Channel(id ChannelID, messageType proto.Message) (*Channel, error) { return nil, nil } +// Start starts the router, connecting to peers and routing messages. +func (r *Router) Start() error -// PeerUpdates returns a channel with peer updates. The caller must cancel the -// context to end the subscription, and keep consuming messages in a timely -// fashion until the channel is closed to avoid blocking updates. -func (r *Router) PeerUpdates(ctx context.Context) PeerUpdates { return nil } +// Stop stops the router, disconnecting from all peers and stopping message routing. +func (r *Router) Stop() error +``` -// PeerUpdates is a channel for receiving peer updates. -type PeerUpdates <-chan PeerUpdate +All Go channel sends in the `Router` and reactors are blocking (the router also selects on signal channels for closure and shutdown). The responsibility for message scheduling, prioritization, backpressure, and load shedding is centralized in a core `queue` interface that is used at contention points (i.e. from all peers to a single channel, and from all channels to a single peer): -// PeerUpdate is a peer status update for reactors. -type PeerUpdate struct { - PeerID PeerID - Status PeerStatus +```go +// queue does QoS scheduling for Envelopes, enqueueing and dequeueing according +// to some policy. Queues are used at contention points, i.e.: +// - Receiving inbound messages to a single channel from all peers. +// - Sending outbound messages to a single peer from all channels. +type queue interface { + // enqueue returns a channel for submitting envelopes. + enqueue() chan<- Envelope + + // dequeue returns a channel ordered according to some queueing policy. + dequeue() <-chan Envelope + + // close closes the queue. After this call enqueue() will block, so the + // caller must select on closed() as well to avoid blocking forever. The + // enqueue() and dequeue() channels will not be closed. + close() + + // closed returns a channel that's closed when the scheduler is closed. + closed() <-chan struct{} } ``` +The current implementation is `fifoQueue`, which is a simple unbuffered lossless queue that passes messages in the order they were received and blocks until the message is delivered (i.e. it is a Go channel). The router will need a more sophisticated queueing policy, but this has not yet been implemented. + +The internal `Router` goroutine structure and design is described in the `Router` GoDoc, which is included below for reference: + +```go +// On startup, three main goroutines are spawned to maintain peer connections: +// +// dialPeers(): in a loop, calls PeerManager.DialNext() to get the next peer +// address to dial and spawns a goroutine that dials the peer, handshakes +// with it, and begins to route messages if successful. +// +// acceptPeers(): in a loop, waits for an inbound connection via +// Transport.Accept() and spawns a goroutine that handshakes with it and +// begins to route messages if successful. +// +// evictPeers(): in a loop, calls PeerManager.EvictNext() to get the next +// peer to evict, and disconnects it by closing its message queue. +// +// When a peer is connected, an outbound peer message queue is registered in +// peerQueues, and routePeer() is called to spawn off two additional goroutines: +// +// sendPeer(): waits for an outbound message from the peerQueues queue, +// marshals it, and passes it to the peer transport which delivers it. +// +// receivePeer(): waits for an inbound message from the peer transport, +// unmarshals it, and passes it to the appropriate inbound channel queue +// in channelQueues. +// +// When a reactor opens a channel via OpenChannel, an inbound channel message +// queue is registered in channelQueues, and a channel goroutine is spawned: +// +// routeChannel(): waits for an outbound message from the channel, looks +// up the recipient peer's outbound message queue in peerQueues, and submits +// the message to it. +// +// All channel sends in the router are blocking. It is the responsibility of the +// queue interface in peerQueues and channelQueues to prioritize and drop +// messages as appropriate during contention to prevent stalls and ensure good +// quality of service. +``` + ### Reactor Example While reactors are a first-class concept in the current P2P stack (i.e. there is an explicit `p2p.Reactor` interface), they will simply be a design pattern in the new stack, loosely defined as "something which listens on a channel and reacts to messages". @@ -409,21 +516,21 @@ The reactor itself would be implemented e.g. like this: ```go // RunEchoReactor wires up an echo reactor to a router and runs it. -func RunEchoReactor(router *p2p.Router) error { - ctx, cancel := context.WithCancel(context.Background()) - defer cancel() - - channel, err := router.Channel(1, &EchoMessage{}) +func RunEchoReactor(router *p2p.Router, peerManager *p2p.PeerManager) error { + channel, err := router.OpenChannel(1, &EchoMessage{}) if err != nil { return err } defer channel.Close() + peerUpdates := peerManager.Subscribe() + defer peerUpdates.Close() - return EchoReactor(ctx, channel, router.PeerUpdates(ctx)) + return EchoReactor(context.Background(), channel, peerUpdates) } -// EchoReactor provides an echo service, pinging all known peers until cancelled. -func EchoReactor(ctx context.Context, channel *p2p.Channel, peerUpdates p2p.PeerUpdates) error { +// EchoReactor provides an echo service, pinging all known peers until the given +// context is cancelled. +func EchoReactor(ctx context.Context, channel *p2p.Channel, peerUpdates *p2p.PeerUpdates) error { ticker := time.NewTicker(5 * time.Second) defer ticker.Stop() @@ -451,9 +558,8 @@ func EchoReactor(ctx context.Context, channel *p2p.Channel, peerUpdates p2p.Peer default: channel.Error <- PeerError{ - PeerID: envelope.From, - Err: fmt.Errorf("unexpected message %T", msg), - Severity: PeerErrorSeverityLow, + PeerID: envelope.From, + Err: fmt.Errorf("unexpected message %T", msg), } } @@ -469,31 +575,9 @@ func EchoReactor(ctx context.Context, channel *p2p.Channel, peerUpdates p2p.Peer } ``` -### Implementation Plan - -The existing P2P stack should be gradually migrated towards this design. The easiest path would likely be: - -1. Implement the `Channel` and `PeerUpdates` APIs as shims on top of the current `Switch` and `Peer` APIs, and rewrite all reactors to use them instead. - -2. Port the `privval` package to no longer use `SecretConnection` (e.g. by using gRPC instead), or temporarily duplicate its functionality. - -3. Rewrite the current MConn connection and transport code to use the new `Transport` API, and migrate existing code to use it instead. - -4. Implement the new `peer` and `peerStore` APIs, and either make the current address book a shim on top of these or replace it. - -5. Replace the existing `Switch` abstraction with the new `Router`. - -6. Move the PEX reactor and other address advertisement/exchange into the P2P core, possibly the `Router`. - -7. Consider rewriting and/or cleaning up reactors and other P2P-related code to make better use of the new abstractions. - -A note on backwards-compatibility: the current MConn protocol takes whole messages expressed as byte slices and splits them up into `PacketMsg` messages, where the final packet of a message has `PacketMsg.EOF` set. In order to maintain wire-compatibility with this protocol, the MConn transport needs to be aware of message boundaries, even though it does not care what the messages actually are. One way to handle this is to break abstraction boundaries and have the transport decode the input's length-prefixed message framing and use this to determine message boundaries, unless we accept breaking the protocol here. - -Similarly, implementing channel handshakes with the current MConn protocol would require doing an initial connection handshake as today and use that information to "fake" the local channel handshake without it hitting the wire. - ## Status -Accepted +Partially implemented ([#5670](https://github.com/tendermint/tendermint/issues/5670)) ## Consequences @@ -519,7 +603,7 @@ Accepted * A complete overhaul of P2P internals is likely to cause temporary performance regressions and bugs as the implementation matures. -* Hiding peer management information inside the `p2p` package may prevent certain functionality or require additional deliberate interfaces for information exchange, as a tradeoff to simplify the design, reduce coupling, and avoid race conditions and lock contention. +* Hiding peer management information inside the `PeerManager` may prevent certain functionality or require additional deliberate interfaces for information exchange, as a tradeoff to simplify the design, reduce coupling, and avoid race conditions and lock contention. ### Neutral diff --git a/docs/architecture/img/adr-062-architecture.svg b/docs/architecture/img/adr-062-architecture.svg index 1ad18a3e0..4a824eee0 100644 --- a/docs/architecture/img/adr-062-architecture.svg +++ b/docs/architecture/img/adr-062-architecture.svg @@ -1 +1 @@ - \ No newline at end of file + \ No newline at end of file