You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

208 lines
5.9 KiB

  1. package p2p
  2. import (
  3. "context"
  4. "fmt"
  5. "sync"
  6. "github.com/gogo/protobuf/proto"
  7. "github.com/tendermint/tendermint/types"
  8. )
  9. // Envelope contains a message with sender/receiver routing info.
  10. type Envelope struct {
  11. From types.NodeID // sender (empty if outbound)
  12. To types.NodeID // receiver (empty if inbound)
  13. Broadcast bool // send to all connected peers (ignores To)
  14. Message proto.Message // message payload
  15. ChannelID ChannelID
  16. }
  17. // Wrapper is a Protobuf message that can contain a variety of inner messages
  18. // (e.g. via oneof fields). If a Channel's message type implements Wrapper, the
  19. // Router will automatically wrap outbound messages and unwrap inbound messages,
  20. // such that reactors do not have to do this themselves.
  21. type Wrapper interface {
  22. proto.Message
  23. // Wrap will take a message and wrap it in this one if possible.
  24. Wrap(proto.Message) error
  25. // Unwrap will unwrap the inner message contained in this message.
  26. Unwrap() (proto.Message, error)
  27. }
  28. // PeerError is a peer error reported via Channel.Error.
  29. //
  30. // FIXME: This currently just disconnects the peer, which is too simplistic.
  31. // For example, some errors should be logged, some should cause disconnects,
  32. // and some should ban the peer.
  33. //
  34. // FIXME: This should probably be replaced by a more general PeerBehavior
  35. // concept that can mark good and bad behavior and contributes to peer scoring.
  36. // It should possibly also allow reactors to request explicit actions, e.g.
  37. // disconnection or banning, in addition to doing this based on aggregates.
  38. type PeerError struct {
  39. NodeID types.NodeID
  40. Err error
  41. }
  42. func (pe PeerError) Error() string { return fmt.Sprintf("peer=%q: %s", pe.NodeID, pe.Err.Error()) }
  43. func (pe PeerError) Unwrap() error { return pe.Err }
  44. // Channel is a bidirectional channel to exchange Protobuf messages with peers.
  45. // Each message is wrapped in an Envelope to specify its sender and receiver.
  46. type Channel struct {
  47. ID ChannelID
  48. inCh <-chan Envelope // inbound messages (peers to reactors)
  49. outCh chan<- Envelope // outbound messages (reactors to peers)
  50. errCh chan<- PeerError // peer error reporting
  51. messageType proto.Message // the channel's message type, used for unmarshaling
  52. }
  53. // NewChannel creates a new channel. It is primarily for internal and test
  54. // use, reactors should use Router.OpenChannel().
  55. func NewChannel(
  56. id ChannelID,
  57. messageType proto.Message,
  58. inCh <-chan Envelope,
  59. outCh chan<- Envelope,
  60. errCh chan<- PeerError,
  61. ) *Channel {
  62. return &Channel{
  63. ID: id,
  64. messageType: messageType,
  65. inCh: inCh,
  66. outCh: outCh,
  67. errCh: errCh,
  68. }
  69. }
  70. // Send blocks until the envelope has been sent, or until ctx ends.
  71. // An error only occurs if the context ends before the send completes.
  72. func (ch *Channel) Send(ctx context.Context, envelope Envelope) error {
  73. select {
  74. case <-ctx.Done():
  75. return ctx.Err()
  76. case ch.outCh <- envelope:
  77. return nil
  78. }
  79. }
  80. // SendError blocks until the given error has been sent, or ctx ends.
  81. // An error only occurs if the context ends before the send completes.
  82. func (ch *Channel) SendError(ctx context.Context, pe PeerError) error {
  83. select {
  84. case <-ctx.Done():
  85. return ctx.Err()
  86. case ch.errCh <- pe:
  87. return nil
  88. }
  89. }
  90. // Receive returns a new unbuffered iterator to receive messages from ch.
  91. // The iterator runs until ctx ends.
  92. func (ch *Channel) Receive(ctx context.Context) *ChannelIterator {
  93. iter := &ChannelIterator{
  94. pipe: make(chan Envelope), // unbuffered
  95. }
  96. go func() {
  97. defer close(iter.pipe)
  98. iteratorWorker(ctx, ch, iter.pipe)
  99. }()
  100. return iter
  101. }
  102. // ChannelIterator provides a context-aware path for callers
  103. // (reactors) to process messages from the P2P layer without relying
  104. // on the implementation details of the P2P layer. Channel provides
  105. // access to it's Outbound stream as an iterator, and the
  106. // MergedChannelIterator makes it possible to combine multiple
  107. // channels into a single iterator.
  108. type ChannelIterator struct {
  109. pipe chan Envelope
  110. current *Envelope
  111. }
  112. func iteratorWorker(ctx context.Context, ch *Channel, pipe chan Envelope) {
  113. for {
  114. select {
  115. case <-ctx.Done():
  116. return
  117. case envelope := <-ch.inCh:
  118. select {
  119. case <-ctx.Done():
  120. return
  121. case pipe <- envelope:
  122. }
  123. }
  124. }
  125. }
  126. // Next returns true when the Envelope value has advanced, and false
  127. // when the context is canceled or iteration should stop. If an iterator has returned false,
  128. // it will never return true again.
  129. // in general, use Next, as in:
  130. //
  131. // for iter.Next(ctx) {
  132. // envelope := iter.Envelope()
  133. // // ... do things ...
  134. // }
  135. //
  136. func (iter *ChannelIterator) Next(ctx context.Context) bool {
  137. select {
  138. case <-ctx.Done():
  139. iter.current = nil
  140. return false
  141. case envelope, ok := <-iter.pipe:
  142. if !ok {
  143. iter.current = nil
  144. return false
  145. }
  146. iter.current = &envelope
  147. return true
  148. }
  149. }
  150. // Envelope returns the current Envelope object held by the
  151. // iterator. When the last call to Next returned true, Envelope will
  152. // return a non-nil object. If Next returned false then Envelope is
  153. // always nil.
  154. func (iter *ChannelIterator) Envelope() *Envelope { return iter.current }
  155. // MergedChannelIterator produces an iterator that merges the
  156. // messages from the given channels in arbitrary order.
  157. //
  158. // This allows the caller to consume messages from multiple channels
  159. // without needing to manage the concurrency separately.
  160. func MergedChannelIterator(ctx context.Context, chs ...*Channel) *ChannelIterator {
  161. iter := &ChannelIterator{
  162. pipe: make(chan Envelope), // unbuffered
  163. }
  164. wg := new(sync.WaitGroup)
  165. for _, ch := range chs {
  166. wg.Add(1)
  167. go func(ch *Channel) {
  168. defer wg.Done()
  169. iteratorWorker(ctx, ch, iter.pipe)
  170. }(ch)
  171. }
  172. done := make(chan struct{})
  173. go func() { defer close(done); wg.Wait() }()
  174. go func() {
  175. defer close(iter.pipe)
  176. // we could return early if the context is canceled,
  177. // but this is safer because it means the pipe stays
  178. // open until all of the ch worker threads end, which
  179. // should happen very quickly.
  180. <-done
  181. }()
  182. return iter
  183. }