You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

216 lines
6.3 KiB

  1. package p2p
  2. import (
  3. "context"
  4. "fmt"
  5. "sync"
  6. "github.com/gogo/protobuf/proto"
  7. "github.com/tendermint/tendermint/types"
  8. )
  9. // Envelope contains a message with sender/receiver routing info.
  10. type Envelope struct {
  11. From types.NodeID // sender (empty if outbound)
  12. To types.NodeID // receiver (empty if inbound)
  13. Broadcast bool // send to all connected peers (ignores To)
  14. Message proto.Message // message payload
  15. // channelID is for internal Router use, set on outbound messages to inform
  16. // the sendPeer() goroutine which transport channel to use.
  17. //
  18. // FIXME: If we migrate the Transport API to a byte-oriented multi-stream
  19. // API, this will no longer be necessary since each channel will be mapped
  20. // onto a stream during channel/peer setup. See:
  21. // https://github.com/tendermint/spec/pull/227
  22. channelID ChannelID
  23. }
  24. // Wrapper is a Protobuf message that can contain a variety of inner messages
  25. // (e.g. via oneof fields). If a Channel's message type implements Wrapper, the
  26. // Router will automatically wrap outbound messages and unwrap inbound messages,
  27. // such that reactors do not have to do this themselves.
  28. type Wrapper interface {
  29. proto.Message
  30. // Wrap will take a message and wrap it in this one if possible.
  31. Wrap(proto.Message) error
  32. // Unwrap will unwrap the inner message contained in this message.
  33. Unwrap() (proto.Message, error)
  34. }
  35. // PeerError is a peer error reported via Channel.Error.
  36. //
  37. // FIXME: This currently just disconnects the peer, which is too simplistic.
  38. // For example, some errors should be logged, some should cause disconnects,
  39. // and some should ban the peer.
  40. //
  41. // FIXME: This should probably be replaced by a more general PeerBehavior
  42. // concept that can mark good and bad behavior and contributes to peer scoring.
  43. // It should possibly also allow reactors to request explicit actions, e.g.
  44. // disconnection or banning, in addition to doing this based on aggregates.
  45. type PeerError struct {
  46. NodeID types.NodeID
  47. Err error
  48. }
  49. func (pe PeerError) Error() string { return fmt.Sprintf("peer=%q: %s", pe.NodeID, pe.Err.Error()) }
  50. func (pe PeerError) Unwrap() error { return pe.Err }
  51. // Channel is a bidirectional channel to exchange Protobuf messages with peers.
  52. // Each message is wrapped in an Envelope to specify its sender and receiver.
  53. type Channel struct {
  54. ID ChannelID
  55. In <-chan Envelope // inbound messages (peers to reactors)
  56. Out chan<- Envelope // outbound messages (reactors to peers)
  57. errCh chan<- PeerError // peer error reporting
  58. messageType proto.Message // the channel's message type, used for unmarshaling
  59. }
  60. // NewChannel creates a new channel. It is primarily for internal and test
  61. // use, reactors should use Router.OpenChannel().
  62. func NewChannel(
  63. id ChannelID,
  64. messageType proto.Message,
  65. inCh <-chan Envelope,
  66. outCh chan<- Envelope,
  67. errCh chan<- PeerError,
  68. ) *Channel {
  69. return &Channel{
  70. ID: id,
  71. messageType: messageType,
  72. In: inCh,
  73. Out: outCh,
  74. errCh: errCh,
  75. }
  76. }
  77. // Send blocks until the envelope has been sent, or until ctx ends.
  78. // An error only occurs if the context ends before the send completes.
  79. func (ch *Channel) Send(ctx context.Context, envelope Envelope) error {
  80. select {
  81. case <-ctx.Done():
  82. return ctx.Err()
  83. case ch.Out <- envelope:
  84. return nil
  85. }
  86. }
  87. // SendError blocks until the given error has been sent, or ctx ends.
  88. // An error only occurs if the context ends before the send completes.
  89. func (ch *Channel) SendError(ctx context.Context, pe PeerError) error {
  90. select {
  91. case <-ctx.Done():
  92. return ctx.Err()
  93. case ch.errCh <- pe:
  94. return nil
  95. }
  96. }
  97. // Receive returns a new unbuffered iterator to receive messages from ch.
  98. // The iterator runs until ctx ends.
  99. func (ch *Channel) Receive(ctx context.Context) *ChannelIterator {
  100. iter := &ChannelIterator{
  101. pipe: make(chan Envelope), // unbuffered
  102. }
  103. go func() {
  104. defer close(iter.pipe)
  105. iteratorWorker(ctx, ch, iter.pipe)
  106. }()
  107. return iter
  108. }
  109. // ChannelIterator provides a context-aware path for callers
  110. // (reactors) to process messages from the P2P layer without relying
  111. // on the implementation details of the P2P layer. Channel provides
  112. // access to it's Outbound stream as an iterator, and the
  113. // MergedChannelIterator makes it possible to combine multiple
  114. // channels into a single iterator.
  115. type ChannelIterator struct {
  116. pipe chan Envelope
  117. current *Envelope
  118. }
  119. func iteratorWorker(ctx context.Context, ch *Channel, pipe chan Envelope) {
  120. for {
  121. select {
  122. case <-ctx.Done():
  123. return
  124. case envelope := <-ch.In:
  125. select {
  126. case <-ctx.Done():
  127. return
  128. case pipe <- envelope:
  129. }
  130. }
  131. }
  132. }
  133. // Next returns true when the Envelope value has advanced, and false
  134. // when the context is canceled or iteration should stop. If an iterator has returned false,
  135. // it will never return true again.
  136. // in general, use Next, as in:
  137. //
  138. // for iter.Next(ctx) {
  139. // envelope := iter.Envelope()
  140. // // ... do things ...
  141. // }
  142. //
  143. func (iter *ChannelIterator) Next(ctx context.Context) bool {
  144. select {
  145. case <-ctx.Done():
  146. iter.current = nil
  147. return false
  148. case envelope, ok := <-iter.pipe:
  149. if !ok {
  150. iter.current = nil
  151. return false
  152. }
  153. iter.current = &envelope
  154. return true
  155. }
  156. }
  157. // Envelope returns the current Envelope object held by the
  158. // iterator. When the last call to Next returned true, Envelope will
  159. // return a non-nil object. If Next returned false then Envelope is
  160. // always nil.
  161. func (iter *ChannelIterator) Envelope() *Envelope { return iter.current }
  162. // MergedChannelIterator produces an iterator that merges the
  163. // messages from the given channels in arbitrary order.
  164. //
  165. // This allows the caller to consume messages from multiple channels
  166. // without needing to manage the concurrency separately.
  167. func MergedChannelIterator(ctx context.Context, chs ...*Channel) *ChannelIterator {
  168. iter := &ChannelIterator{
  169. pipe: make(chan Envelope), // unbuffered
  170. }
  171. wg := new(sync.WaitGroup)
  172. done := make(chan struct{})
  173. go func() { defer close(done); wg.Wait() }()
  174. go func() {
  175. defer close(iter.pipe)
  176. // we could return early if the context is canceled,
  177. // but this is safer because it means the pipe stays
  178. // open until all of the ch worker threads end, which
  179. // should happen very quickly.
  180. <-done
  181. }()
  182. for _, ch := range chs {
  183. wg.Add(1)
  184. go func(ch *Channel) {
  185. defer wg.Done()
  186. iteratorWorker(ctx, ch, iter.pipe)
  187. }(ch)
  188. }
  189. return iter
  190. }