You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

615 lines
32 KiB

  1. # ADR 062: P2P Architecture and Abstractions
  2. ## Changelog
  3. - 2020-11-09: Initial version (@erikgrinaker)
  4. - 2020-11-13: Remove stream IDs, move peer errors onto channel, note on moving PEX into core (@erikgrinaker)
  5. - 2020-11-16: Notes on recommended reactor implementation patterns, approve ADR (@erikgrinaker)
  6. - 2021-02-04: Update with new P2P core and Transport API changes (@erikgrinaker).
  7. ## Context
  8. In [ADR 061](adr-061-p2p-refactor-scope.md) we decided to refactor the peer-to-peer (P2P) networking stack. The first phase is to redesign and refactor the internal P2P architecture, while retaining protocol compatibility as far as possible.
  9. ## Alternative Approaches
  10. Several variations of the proposed design were considered, including e.g. calling interface methods instead of passing messages (like the current architecture), merging channels with streams, exposing the internal peer data structure to reactors, being message format-agnostic via arbitrary codecs, and so on. This design was chosen because it has very loose coupling, is simpler to reason about and more convenient to use, avoids race conditions and lock contention for internal data structures, gives reactors better control of message ordering and processing semantics, and allows for QoS scheduling and backpressure in a very natural way.
  11. [multiaddr](https://github.com/multiformats/multiaddr) was considered as a transport-agnostic peer address format over regular URLs, but it does not appear to have very widespread adoption, and advanced features like protocol encapsulation and tunneling do not appear to be immediately useful to us.
  12. There were also proposals to use LibP2P instead of maintaining our own P2P stack, which were rejected (for now) in [ADR 061](adr-061-p2p-refactor-scope.md).
  13. The initial version of this ADR had a byte-oriented multi-stream transport API, but this had to be abandoned/postponed to maintain backwards-compatibility with the existing MConnection protocol which is message-oriented. See the rejected RFC in [tendermint/spec#227](https://github.com/tendermint/spec/pull/227) for details.
  14. ## Decision
  15. The P2P stack will be redesigned as a message-oriented architecture, primarily relying on Go channels for communication and scheduling. It will use a message-oriented transport to binary messages with individual peers, bidirectional peer-addressable channels to send and receive Protobuf messages, a router to route messages between reactors and peers, and a peer manager to manage peer lifecycle information. Message passing is asynchronous with at-most-once delivery.
  16. ## Detailed Design
  17. This ADR is primarily concerned with the architecture and interfaces of the P2P stack, not implementation details. The interfaces described here should therefore be considered a rough architecture outline, not a complete and final design.
  18. Primary design objectives have been:
  19. * Loose coupling between components, for a simpler, more robust, and test-friendly architecture.
  20. * Pluggable transports (not necessarily networked).
  21. * Better scheduling of messages, with improved prioritization, backpressure, and performance.
  22. * Centralized peer lifecycle and connection management.
  23. * Better peer address detection, advertisement, and exchange.
  24. * Wire-level backwards compatibility with current P2P network protocols, except where it proves too obstructive.
  25. The main abstractions in the new stack are:
  26. * `Transport`: An arbitrary mechanism to exchange binary messages with a peer across a `Connection`.
  27. * `Channel`: A bidirectional channel to asynchronously exchange Protobuf messages with peers using node ID addressing.
  28. * `Router`: Maintains transport connections to relevant peers and routes channel messages.
  29. * `PeerManager`: Manages peer lifecycle information, e.g. deciding which peers to dial and when, using a `peerStore` for storage.
  30. * Reactor: A design pattern loosely defined as "something which listens on a channel and reacts to messages".
  31. These abstractions are illustrated in the following diagram (representing the internals of node A) and described in detail below.
  32. ![P2P Architecture Diagram](img/adr-062-architecture.svg)
  33. ### Transports
  34. Transports are arbitrary mechanisms for exchanging binary messages with a peer. For example, a gRPC transport would connect to a peer over TCP/IP and send data using the gRPC protocol, while an in-memory transport might communicate with a peer running in another goroutine using internal Go channels. Note that transports don't have a notion of a "peer" or "node" as such - instead, they establish connections between arbitrary endpoint addresses (e.g. IP address and port number), to decouple them from the rest of the P2P stack.
  35. Transports must satisfy the following requirements:
  36. * Be connection-oriented, and support both listening for inbound connections and making outbound connections using endpoint addresses.
  37. * Support sending binary messages with distinct channel IDs (although channels and channel IDs are a higher-level application protocol concept explained in the Router section, they are threaded through the transport layer as well for backwards compatibilty with the existing MConnection protocol).
  38. * Exchange the MConnection `NodeInfo` and public key via a node handshake, and possibly encrypt or sign the traffic as appropriate.
  39. The initial transport is a port of the current MConnection protocol currently used by Tendermint, and should be backwards-compatible at the wire level. An in-memory transport for testing has also been implemented. There are plans to explore a QUIC transport that may replace the MConnection protocol.
  40. The `Transport` interface is as follows:
  41. ```go
  42. // Transport is a connection-oriented mechanism for exchanging data with a peer.
  43. type Transport interface {
  44. // Protocols returns the protocols supported by the transport. The Router
  45. // uses this to pick a transport for an Endpoint.
  46. Protocols() []Protocol
  47. // Endpoints returns the local endpoints the transport is listening on, if any.
  48. // How to listen is transport-dependent, e.g. MConnTransport uses Listen() while
  49. // MemoryTransport starts listening via MemoryNetwork.CreateTransport().
  50. Endpoints() []Endpoint
  51. // Accept waits for the next inbound connection on a listening endpoint, blocking
  52. // until either a connection is available or the transport is closed. On closure,
  53. // io.EOF is returned and further Accept calls are futile.
  54. Accept() (Connection, error)
  55. // Dial creates an outbound connection to an endpoint.
  56. Dial(context.Context, Endpoint) (Connection, error)
  57. // Close stops accepting new connections, but does not close active connections.
  58. Close() error
  59. }
  60. ```
  61. How the transport configures listening is transport-dependent, and not covered by the interface. This typically happens during transport construction, where a single instance of the transport is created and set to listen on an appropriate network interface before being passed to the router.
  62. #### Endpoints
  63. `Endpoint` represents a transport endpoint (e.g. an IP address and port). A connection always has two endpoints: one at the local node and one at the remote peer. Outbound connections to remote endpoints are made via `Dial()`, and inbound connections to listening endpoints are returned via `Accept()`.
  64. The `Endpoint` struct is:
  65. ```go
  66. // Endpoint represents a transport connection endpoint, either local or remote.
  67. //
  68. // Endpoints are not necessarily networked (see e.g. MemoryTransport) but all
  69. // networked endpoints must use IP as the underlying transport protocol to allow
  70. // e.g. IP address filtering. Either IP or Path (or both) must be set.
  71. type Endpoint struct {
  72. // Protocol specifies the transport protocol.
  73. Protocol Protocol
  74. // IP is an IP address (v4 or v6) to connect to. If set, this defines the
  75. // endpoint as a networked endpoint.
  76. IP net.IP
  77. // Port is a network port (either TCP or UDP). If 0, a default port may be
  78. // used depending on the protocol.
  79. Port uint16
  80. // Path is an optional transport-specific path or identifier.
  81. Path string
  82. }
  83. // Protocol identifies a transport protocol.
  84. type Protocol string
  85. ```
  86. Endpoints are arbitrary transport-specific addresses, but if they are networked they must use IP addresses and thus rely on IP as a fundamental packet routing protocol. This enables policies for address discovery, advertisement, and exchange - for example, a private `192.168.0.0/24` IP address should only be advertised to peers on that IP network, while the public address `8.8.8.8` may be advertised to all peers. Similarly, any port numbers if given must represent TCP and/or UDP port numbers, in order to use [UPnP](https://en.wikipedia.org/wiki/Universal_Plug_and_Play) to autoconfigure e.g. NAT gateways.
  87. Non-networked endpoints (without an IP address) are considered local, and will only be advertised to other peers connecting via the same protocol. For example, the in-memory transport used for testing uses `Endpoint{Protocol: "memory", Path: "foo"}` as an address for the node "foo", and this should only be advertised to other nodes using `Protocol: "memory"`.
  88. #### Connections
  89. A connection represents an established transport connection between two endpoints (i.e. two nodes), which can be used to exchange binary messages with logical channel IDs (corresponding to the higher-level channel IDs used in the router). Connections are set up either via `Transport.Dial()` (outbound) or `Transport.Accept()` (inbound).
  90. Once a connection is esablished, `Transport.Handshake()` must be called to perform a node handshake, exchanging node info and public keys to verify node identities. Node handshakes should not really be part of the transport layer (it's an application protocol concern), this exists for backwards-compatibility with the existing MConnection protocol which conflates the two. `NodeInfo` is part of the existing MConnection protocol, but does not appear to be documented in the specification -- refer to the Go codebase for details.
  91. The `Connection` interface is shown below. It omits certain additions that are currently implemented for backwards compatibility with the legacy P2P stack and are planned to be removed before the final release.
  92. ```go
  93. // Connection represents an established connection between two endpoints.
  94. type Connection interface {
  95. // Handshake executes a node handshake with the remote peer. It must be
  96. // called once the connection is established, and returns the remote peer's
  97. // node info and public key. The caller is responsible for validation.
  98. Handshake(context.Context, NodeInfo, crypto.PrivKey) (NodeInfo, crypto.PubKey, error)
  99. // ReceiveMessage returns the next message received on the connection,
  100. // blocking until one is available. Returns io.EOF if closed.
  101. ReceiveMessage() (ChannelID, []byte, error)
  102. // SendMessage sends a message on the connection. Returns io.EOF if closed.
  103. SendMessage(ChannelID, []byte) error
  104. // LocalEndpoint returns the local endpoint for the connection.
  105. LocalEndpoint() Endpoint
  106. // RemoteEndpoint returns the remote endpoint for the connection.
  107. RemoteEndpoint() Endpoint
  108. // Close closes the connection.
  109. Close() error
  110. }
  111. ```
  112. This ADR initially proposed a byte-oriented multi-stream connection API that follows more typical networking API conventions (using e.g. `io.Reader` and `io.Writer` interfaces which easily compose with other libraries). This would also allow moving the responsibility for message framing, node handshakes, and traffic scheduling to the common router instead of reimplementing this across transports, and would allow making better use of multi-stream protocols such as QUIC. However, this would require minor breaking changes to the MConnection protocol which were rejected, see [tendermint/spec#227](https://github.com/tendermint/spec/pull/227) for details. This should be revisited when starting work on a QUIC transport.
  113. ### Peer Management
  114. Peers are other Tendermint nodes. Each peer is identified by a unique `NodeID` (tied to the node's private key).
  115. #### Peer Addresses
  116. Nodes have one or more `NodeAddress` addresses expressed as URLs that they can be reached at. Examples of node addresses might be e.g.:
  117. * `mconn://nodeid@host.domain.com:25567/path`
  118. * `memory:nodeid`
  119. Addresses are resolved into one or more transport endpoints, e.g. by resolving DNS hostnames into IP addresses. Peers should always be expressed as address URLs rather than endpoints (which are a lower-level transport construct).
  120. ```go
  121. // NodeID is a hex-encoded crypto.Address. It must be lowercased
  122. // (for uniqueness) and of length 40.
  123. type NodeID string
  124. // NodeAddress is a node address URL. It differs from a transport Endpoint in
  125. // that it contains the node's ID, and that the address hostname may be resolved
  126. // into multiple IP addresses (and thus multiple endpoints).
  127. //
  128. // If the URL is opaque, i.e. of the form "scheme:opaque", then the opaque part
  129. // is expected to contain a node ID.
  130. type NodeAddress struct {
  131. NodeID NodeID
  132. Protocol Protocol
  133. Hostname string
  134. Port uint16
  135. Path string
  136. }
  137. // ParseNodeAddress parses a node address URL into a NodeAddress, normalizing
  138. // and validating it.
  139. func ParseNodeAddress(urlString string) (NodeAddress, error)
  140. // Resolve resolves a NodeAddress into a set of Endpoints, e.g. by expanding
  141. // out a DNS hostname to IP addresses.
  142. func (a NodeAddress) Resolve(ctx context.Context) ([]Endpoint, error)
  143. ```
  144. #### Peer Manager
  145. The P2P stack needs to track a lot of internal state about peers, such as their addresses, connection state, priorities, availability, failures, retries, and so on. This responsibility has been separated out to a `PeerManager`, which track this state for the `Router` (but does not maintain the actual transport connections themselves, which is the router's responsibility).
  146. The `PeerManager` is a synchronous state machine, where all state transitions are serialized (implemented as synchronous method calls holding an exclusive mutex lock). Most peer state is intentionally kept internal, stored in a `peerStore` database that persists it as appropriate, and the external interfaces pass the minimum amount of information necessary in order to avoid shared state between router goroutines. This design significantly simplifies the model, making it much easier to reason about and test than if it was baked into the asynchronous ball of concurrency that the P2P networking core must necessarily be. As peer lifecycle events are expected to be relatively infrequent, this should not significantly impact performance either.
  147. The `Router` uses the `PeerManager` to request which peers to dial and evict, and reports in with peer lifecycle events such as connections, disconnections, and failures as they occur. The manager can reject these events (e.g. reject an inbound connection) by returning errors. This happens as follows:
  148. * Outbound connections, via `Transport.Dial`:
  149. * `DialNext()`: returns a peer address to dial, or blocks until one is available.
  150. * `DialFailed()`: reports a peer dial failure.
  151. * `Dialed()`: reports a peer dial success.
  152. * `Ready()`: reports the peer as routed and ready.
  153. * `Disconnected()`: reports a peer disconnection.
  154. * Inbound connections, via `Transport.Accept`:
  155. * `Accepted()`: reports an inbound peer connection.
  156. * `Ready()`: reports the peer as routed and ready.
  157. * `Disconnected()`: reports a peer disconnection.
  158. * Evictions, via `Connection.Close`:
  159. * `EvictNext()`: returns a peer to disconnect, or blocks until one is available.
  160. * `Disconnected()`: reports a peer disconnection.
  161. These calls have the following interface:
  162. ```go
  163. // DialNext returns a peer address to dial, blocking until one is available.
  164. func (m *PeerManager) DialNext(ctx context.Context) (NodeAddress, error)
  165. // DialFailed reports a dial failure for the given address.
  166. func (m *PeerManager) DialFailed(address NodeAddress) error
  167. // Dialed reports a successful outbound connection to the given address.
  168. func (m *PeerManager) Dialed(address NodeAddress) error
  169. // Accepted reports a successful inbound connection from the given node.
  170. func (m *PeerManager) Accepted(peerID NodeID) error
  171. // Ready reports the peer as fully routed and ready for use.
  172. func (m *PeerManager) Ready(peerID NodeID) error
  173. // EvictNext returns a peer ID to disconnect, blocking until one is available.
  174. func (m *PeerManager) EvictNext(ctx context.Context) (NodeID, error)
  175. // Disconnected reports a peer disconnection.
  176. func (m *PeerManager) Disconnected(peerID NodeID) error
  177. ```
  178. Internally, the `PeerManager` uses a numeric peer score to prioritize peers, e.g. when deciding which peers to dial next. The scoring policy has not yet been implemented, but should take into account e.g. node configuration such a `persistent_peers`, uptime and connection failures, performance, and so on. The manager will also attempt to automatically upgrade to better-scored peers by evicting lower-scored peers when a better one becomes available (e.g. when a persistent peer comes back online after an outage).
  179. The `PeerManager` should also have an API for reporting peer behavior from reactors that affects its score (e.g. signing a block increases the score, double-voting decreases it or even bans the peer), but this has not yet been designed and implemented.
  180. Additionally, the `PeerManager` provides `PeerUpdates` subscriptions that will receive `PeerUpdate` events whenever significant peer state changes happen. Reactors can use these e.g. to know when peers are connected or disconnected, and take appropriate action. This is currently fairly minimal:
  181. ```go
  182. // Subscribe subscribes to peer updates. The caller must consume the peer updates
  183. // in a timely fashion and close the subscription when done, to avoid stalling the
  184. // PeerManager as delivery is semi-synchronous, guaranteed, and ordered.
  185. func (m *PeerManager) Subscribe() *PeerUpdates
  186. // PeerUpdate is a peer update event sent via PeerUpdates.
  187. type PeerUpdate struct {
  188. NodeID NodeID
  189. Status PeerStatus
  190. }
  191. // PeerStatus is a peer status.
  192. type PeerStatus string
  193. const (
  194. PeerStatusUp PeerStatus = "up" // Connected and ready.
  195. PeerStatusDown PeerStatus = "down" // Disconnected.
  196. )
  197. // PeerUpdates is a real-time peer update subscription.
  198. type PeerUpdates struct { ... }
  199. // Updates returns a channel for consuming peer updates.
  200. func (pu *PeerUpdates) Updates() <-chan PeerUpdate
  201. // Close closes the peer updates subscription.
  202. func (pu *PeerUpdates) Close()
  203. ```
  204. The `PeerManager` will also be responsible for providing peer information to the PEX reactor that can be gossipped to other nodes. This requires an improved system for peer address detection and advertisement, that e.g. reliably detects peer and self addresses and only gossips private network addresses to other peers on the same network, but this system has not yet been fully designed and implemented.
  205. ### Channels
  206. While low-level data exchange happens via the `Transport`, the high-level API is based on a bidirectional `Channel` that can send and receive Protobuf messages addressed by `NodeID`. A channel is identified by an arbitrary `ChannelID` identifier, and can exchange Protobuf messages of one specific type (since the type to unmarshal into must be predefined). Message delivery is asynchronous and at-most-once.
  207. The channel can also be used to report peer errors, e.g. when receiving an invalid or malignant message. This may cause the peer to be disconnected or banned depending on `PeerManager` policy, but should probably be replaced by a broader peer behavior API that can also report good behavior.
  208. A `Channel` has this interface:
  209. ```go
  210. // ChannelID is an arbitrary channel ID.
  211. type ChannelID uint16
  212. // Channel is a bidirectional channel to exchange Protobuf messages with peers.
  213. type Channel struct {
  214. ID ChannelID // Channel ID.
  215. In <-chan Envelope // Inbound messages (peers to reactors).
  216. Out chan<- Envelope // outbound messages (reactors to peers)
  217. Error chan<- PeerError // Peer error reporting.
  218. messageType proto.Message // Channel's message type, for e.g. unmarshaling.
  219. }
  220. // Close closes the channel, also closing Out and Error.
  221. func (c *Channel) Close() error
  222. // Envelope specifies the message receiver and sender.
  223. type Envelope struct {
  224. From NodeID // Sender (empty if outbound).
  225. To NodeID // Receiver (empty if inbound).
  226. Broadcast bool // Send to all connected peers, ignoring To.
  227. Message proto.Message // Message payload.
  228. }
  229. // PeerError is a peer error reported via the Error channel.
  230. type PeerError struct {
  231. NodeID NodeID
  232. Err error
  233. }
  234. ```
  235. A channel can reach any connected peer, and will automatically (un)marshal the Protobuf messages. Message scheduling and queueing is a `Router` implementation concern, and can use any number of algorithms such as FIFO, round-robin, priority queues, etc. Since message delivery is not guaranteed, both inbound and outbound messages may be dropped, buffered, reordered, or blocked as appropriate.
  236. Since a channel can only exchange messages of a single type, it is often useful to use a wrapper message type with e.g. a Protobuf `oneof` field that specifies a set of inner message types that it can contain. The channel can automatically perform this (un)wrapping if the outer message type implements the `Wrapper` interface (see [Reactor Example](#reactor-example) for an example):
  237. ```go
  238. // Wrapper is a Protobuf message that can contain a variety of inner messages.
  239. // If a Channel's message type implements Wrapper, the channel will
  240. // automatically (un)wrap passed messages using the container type, such that
  241. // the channel can transparently support multiple message types.
  242. type Wrapper interface {
  243. proto.Message
  244. // Wrap will take a message and wrap it in this one.
  245. Wrap(proto.Message) error
  246. // Unwrap will unwrap the inner message contained in this message.
  247. Unwrap() (proto.Message, error)
  248. }
  249. ```
  250. ### Routers
  251. The router exeutes P2P networking for a node, taking instructions from and reporting events to the `PeerManager`, maintaining transport connections to peers, and routing messages between channels and peers.
  252. Practically all concurrency in the P2P stack has been moved into the router and reactors, while as many other responsibilities as possible have been moved into separate components such as the `Transport` and `PeerManager` that can remain largely synchronous. Limiting concurrency to a single core component makes it much easier to reason about since there is only a single concurrency structure, while the remaining components can be serial, simple, and easily testable.
  253. The `Router` has a very minimal API, since it is mostly driven by `PeerManager` and `Transport` events:
  254. ```go
  255. // Router maintains peer transport connections and routes messages between
  256. // peers and channels.
  257. type Router struct {
  258. // Some details have been omitted below.
  259. logger log.Logger
  260. options RouterOptions
  261. nodeInfo NodeInfo
  262. privKey crypto.PrivKey
  263. peerManager *PeerManager
  264. transports []Transport
  265. peerMtx sync.RWMutex
  266. peerQueues map[NodeID]queue
  267. channelMtx sync.RWMutex
  268. channelQueues map[ChannelID]queue
  269. }
  270. // OpenChannel opens a new channel for the given message type. The caller must
  271. // close the channel when done, before stopping the Router. messageType is the
  272. // type of message passed through the channel.
  273. func (r *Router) OpenChannel(id ChannelID, messageType proto.Message) (*Channel, error)
  274. // Start starts the router, connecting to peers and routing messages.
  275. func (r *Router) Start() error
  276. // Stop stops the router, disconnecting from all peers and stopping message routing.
  277. func (r *Router) Stop() error
  278. ```
  279. All Go channel sends in the `Router` and reactors are blocking (the router also selects on signal channels for closure and shutdown). The responsibility for message scheduling, prioritization, backpressure, and load shedding is centralized in a core `queue` interface that is used at contention points (i.e. from all peers to a single channel, and from all channels to a single peer):
  280. ```go
  281. // queue does QoS scheduling for Envelopes, enqueueing and dequeueing according
  282. // to some policy. Queues are used at contention points, i.e.:
  283. // - Receiving inbound messages to a single channel from all peers.
  284. // - Sending outbound messages to a single peer from all channels.
  285. type queue interface {
  286. // enqueue returns a channel for submitting envelopes.
  287. enqueue() chan<- Envelope
  288. // dequeue returns a channel ordered according to some queueing policy.
  289. dequeue() <-chan Envelope
  290. // close closes the queue. After this call enqueue() will block, so the
  291. // caller must select on closed() as well to avoid blocking forever. The
  292. // enqueue() and dequeue() channels will not be closed.
  293. close()
  294. // closed returns a channel that's closed when the scheduler is closed.
  295. closed() <-chan struct{}
  296. }
  297. ```
  298. The current implementation is `fifoQueue`, which is a simple unbuffered lossless queue that passes messages in the order they were received and blocks until the message is delivered (i.e. it is a Go channel). The router will need a more sophisticated queueing policy, but this has not yet been implemented.
  299. The internal `Router` goroutine structure and design is described in the `Router` GoDoc, which is included below for reference:
  300. ```go
  301. // On startup, three main goroutines are spawned to maintain peer connections:
  302. //
  303. // dialPeers(): in a loop, calls PeerManager.DialNext() to get the next peer
  304. // address to dial and spawns a goroutine that dials the peer, handshakes
  305. // with it, and begins to route messages if successful.
  306. //
  307. // acceptPeers(): in a loop, waits for an inbound connection via
  308. // Transport.Accept() and spawns a goroutine that handshakes with it and
  309. // begins to route messages if successful.
  310. //
  311. // evictPeers(): in a loop, calls PeerManager.EvictNext() to get the next
  312. // peer to evict, and disconnects it by closing its message queue.
  313. //
  314. // When a peer is connected, an outbound peer message queue is registered in
  315. // peerQueues, and routePeer() is called to spawn off two additional goroutines:
  316. //
  317. // sendPeer(): waits for an outbound message from the peerQueues queue,
  318. // marshals it, and passes it to the peer transport which delivers it.
  319. //
  320. // receivePeer(): waits for an inbound message from the peer transport,
  321. // unmarshals it, and passes it to the appropriate inbound channel queue
  322. // in channelQueues.
  323. //
  324. // When a reactor opens a channel via OpenChannel, an inbound channel message
  325. // queue is registered in channelQueues, and a channel goroutine is spawned:
  326. //
  327. // routeChannel(): waits for an outbound message from the channel, looks
  328. // up the recipient peer's outbound message queue in peerQueues, and submits
  329. // the message to it.
  330. //
  331. // All channel sends in the router are blocking. It is the responsibility of the
  332. // queue interface in peerQueues and channelQueues to prioritize and drop
  333. // messages as appropriate during contention to prevent stalls and ensure good
  334. // quality of service.
  335. ```
  336. ### Reactor Example
  337. While reactors are a first-class concept in the current P2P stack (i.e. there is an explicit `p2p.Reactor` interface), they will simply be a design pattern in the new stack, loosely defined as "something which listens on a channel and reacts to messages".
  338. Since reactors have very few formal constraints, they can be implemented in a variety of ways. There is currently no recommended pattern for implementing reactors, to avoid overspecification and scope creep in this ADR. However, prototyping and developing a reactor pattern should be done early during implementation, to make sure reactors built using the `Channel` interface can satisfy the needs for convenience, deterministic tests, and reliability.
  339. Below is a trivial example of a simple echo reactor implemented as a function. The reactor will exchange the following Protobuf messages:
  340. ```protobuf
  341. message EchoMessage {
  342. oneof inner {
  343. PingMessage ping = 1;
  344. PongMessage pong = 2;
  345. }
  346. }
  347. message PingMessage {
  348. string content = 1;
  349. }
  350. message PongMessage {
  351. string content = 1;
  352. }
  353. ```
  354. Implementing the `Wrapper` interface for `EchoMessage` allows transparently passing `PingMessage` and `PongMessage` through the channel, where it will automatically be (un)wrapped in an `EchoMessage`:
  355. ```go
  356. func (m *EchoMessage) Wrap(inner proto.Message) error {
  357. switch inner := inner.(type) {
  358. case *PingMessage:
  359. m.Inner = &EchoMessage_PingMessage{Ping: inner}
  360. case *PongMessage:
  361. m.Inner = &EchoMessage_PongMessage{Pong: inner}
  362. default:
  363. return fmt.Errorf("unknown message %T", inner)
  364. }
  365. return nil
  366. }
  367. func (m *EchoMessage) Unwrap() (proto.Message, error) {
  368. switch inner := m.Inner.(type) {
  369. case *EchoMessage_PingMessage:
  370. return inner.Ping, nil
  371. case *EchoMessage_PongMessage:
  372. return inner.Pong, nil
  373. default:
  374. return nil, fmt.Errorf("unknown message %T", inner)
  375. }
  376. }
  377. ```
  378. The reactor itself would be implemented e.g. like this:
  379. ```go
  380. // RunEchoReactor wires up an echo reactor to a router and runs it.
  381. func RunEchoReactor(router *p2p.Router, peerManager *p2p.PeerManager) error {
  382. channel, err := router.OpenChannel(1, &EchoMessage{})
  383. if err != nil {
  384. return err
  385. }
  386. defer channel.Close()
  387. peerUpdates := peerManager.Subscribe()
  388. defer peerUpdates.Close()
  389. return EchoReactor(context.Background(), channel, peerUpdates)
  390. }
  391. // EchoReactor provides an echo service, pinging all known peers until the given
  392. // context is canceled.
  393. func EchoReactor(ctx context.Context, channel *p2p.Channel, peerUpdates *p2p.PeerUpdates) error {
  394. ticker := time.NewTicker(5 * time.Second)
  395. defer ticker.Stop()
  396. for {
  397. select {
  398. // Send ping message to all known peers every 5 seconds.
  399. case <-ticker.C:
  400. channel.Out <- Envelope{
  401. Broadcast: true,
  402. Message: &PingMessage{Content: "👋"},
  403. }
  404. // When we receive a message from a peer, either respond to ping, output
  405. // pong, or report peer error on unknown message type.
  406. case envelope := <-channel.In:
  407. switch msg := envelope.Message.(type) {
  408. case *PingMessage:
  409. channel.Out <- Envelope{
  410. To: envelope.From,
  411. Message: &PongMessage{Content: msg.Content},
  412. }
  413. case *PongMessage:
  414. fmt.Printf("%q replied with %q\n", envelope.From, msg.Content)
  415. default:
  416. channel.Error <- PeerError{
  417. PeerID: envelope.From,
  418. Err: fmt.Errorf("unexpected message %T", msg),
  419. }
  420. }
  421. // Output info about any peer status changes.
  422. case peerUpdate := <-peerUpdates:
  423. fmt.Printf("Peer %q changed status to %q", peerUpdate.PeerID, peerUpdate.Status)
  424. // Exit when context is canceled.
  425. case <-ctx.Done():
  426. return nil
  427. }
  428. }
  429. }
  430. ```
  431. ## Status
  432. Partially implemented ([#5670](https://github.com/tendermint/tendermint/issues/5670))
  433. ## Consequences
  434. ### Positive
  435. * Reduced coupling and simplified interfaces should lead to better understandability, increased reliability, and more testing.
  436. * Using message passing via Go channels gives better control of backpressure and quality-of-service scheduling.
  437. * Peer lifecycle and connection management is centralized in a single entity, making it easier to reason about.
  438. * Detection, advertisement, and exchange of node addresses will be improved.
  439. * Additional transports (e.g. QUIC) can be implemented and used in parallel with the existing MConn protocol.
  440. * The P2P protocol will not be broken in the initial version, if possible.
  441. ### Negative
  442. * Fully implementing the new design as indended is likely to require breaking changes to the P2P protocol at some point, although the initial implementation shouldn't.
  443. * Gradually migrating the existing stack and maintaining backwards-compatibility will be more labor-intensive than simply replacing the entire stack.
  444. * A complete overhaul of P2P internals is likely to cause temporary performance regressions and bugs as the implementation matures.
  445. * Hiding peer management information inside the `PeerManager` may prevent certain functionality or require additional deliberate interfaces for information exchange, as a tradeoff to simplify the design, reduce coupling, and avoid race conditions and lock contention.
  446. ### Neutral
  447. * Implementation details around e.g. peer management, message scheduling, and peer and endpoint advertisement are not yet determined.
  448. ## References
  449. * [ADR 061: P2P Refactor Scope](adr-061-p2p-refactor-scope.md)
  450. * [#5670 p2p: internal refactor and architecture redesign](https://github.com/tendermint/tendermint/issues/5670)