You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

324 lines
9.7 KiB

  1. package p2p
  2. import (
  3. "errors"
  4. "sort"
  5. "github.com/gogo/protobuf/proto"
  6. )
  7. // ============================================================================
  8. // TODO: Types and business logic below are temporary and will be removed once
  9. // the legacy p2p stack is removed in favor of the new model.
  10. //
  11. // ref: https://github.com/tendermint/tendermint/issues/5670
  12. // ============================================================================
  13. var _ Reactor = (*ReactorShim)(nil)
  14. type (
  15. messageValidator interface {
  16. Validate() error
  17. }
  18. // ReactorShim defines a generic shim wrapper around a BaseReactor. It is
  19. // responsible for wiring up legacy p2p behavior to the new p2p semantics
  20. // (e.g. proxying Envelope messages to legacy peers).
  21. ReactorShim struct {
  22. BaseReactor
  23. Name string
  24. PeerUpdates *PeerUpdatesCh
  25. Channels map[ChannelID]*ChannelShim
  26. }
  27. // ChannelShim defines a generic shim wrapper around a legacy p2p channel
  28. // and the new p2p Channel. It also includes the raw bi-directional Go channels
  29. // so we can proxy message delivery.
  30. ChannelShim struct {
  31. Descriptor *ChannelDescriptor
  32. Channel *Channel
  33. }
  34. // ChannelDescriptorShim defines a shim wrapper around a legacy p2p channel
  35. // and the proto.Message the new p2p Channel is responsible for handling.
  36. // A ChannelDescriptorShim is not contained in ReactorShim, but is rather
  37. // used to construct a ReactorShim.
  38. ChannelDescriptorShim struct {
  39. MsgType proto.Message
  40. Descriptor *ChannelDescriptor
  41. }
  42. )
  43. func NewReactorShim(name string, descriptors map[ChannelID]*ChannelDescriptorShim) *ReactorShim {
  44. channels := make(map[ChannelID]*ChannelShim)
  45. for _, cds := range descriptors {
  46. chShim := NewChannelShim(cds, 0)
  47. channels[chShim.Channel.id] = chShim
  48. }
  49. rs := &ReactorShim{
  50. Name: name,
  51. PeerUpdates: NewPeerUpdates(),
  52. Channels: channels,
  53. }
  54. rs.BaseReactor = *NewBaseReactor(name, rs)
  55. return rs
  56. }
  57. func NewChannelShim(cds *ChannelDescriptorShim, buf uint) *ChannelShim {
  58. return &ChannelShim{
  59. Descriptor: cds.Descriptor,
  60. Channel: NewChannel(
  61. ChannelID(cds.Descriptor.ID),
  62. cds.MsgType,
  63. make(chan Envelope, buf),
  64. make(chan Envelope, buf),
  65. make(chan PeerError, buf),
  66. ),
  67. }
  68. }
  69. // proxyPeerEnvelopes iterates over each p2p Channel and starts a separate
  70. // go-routine where we listen for outbound envelopes sent during Receive
  71. // executions (or anything else that may send on the Channel) and proxy them to
  72. // the corresponding Peer using the To field from the envelope.
  73. func (rs *ReactorShim) proxyPeerEnvelopes() {
  74. for _, cs := range rs.Channels {
  75. go func(cs *ChannelShim) {
  76. for e := range cs.Channel.outCh {
  77. msg := proto.Clone(cs.Channel.messageType)
  78. msg.Reset()
  79. wrapper, ok := msg.(Wrapper)
  80. if ok {
  81. if err := wrapper.Wrap(e.Message); err != nil {
  82. rs.Logger.Error(
  83. "failed to proxy envelope; failed to wrap message",
  84. "ch_id", cs.Descriptor.ID,
  85. "msg", e.Message,
  86. "err", err,
  87. )
  88. continue
  89. }
  90. } else {
  91. msg = e.Message
  92. }
  93. bz, err := proto.Marshal(msg)
  94. if err != nil {
  95. rs.Logger.Error(
  96. "failed to proxy envelope; failed to encode message",
  97. "ch_id", cs.Descriptor.ID,
  98. "msg", e.Message,
  99. "err", err,
  100. )
  101. continue
  102. }
  103. switch {
  104. case e.Broadcast:
  105. rs.Switch.Broadcast(cs.Descriptor.ID, bz)
  106. case e.To != "":
  107. src := rs.Switch.peers.Get(e.To)
  108. if src == nil {
  109. rs.Logger.Error(
  110. "failed to proxy envelope; failed to find peer",
  111. "ch_id", cs.Descriptor.ID,
  112. "msg", e.Message,
  113. "peer", e.To,
  114. )
  115. continue
  116. }
  117. if !src.Send(cs.Descriptor.ID, bz) {
  118. rs.Logger.Error(
  119. "failed to proxy message to peer",
  120. "ch_id", cs.Descriptor.ID,
  121. "msg", e.Message,
  122. "peer", e.To,
  123. )
  124. }
  125. default:
  126. rs.Logger.Error("failed to proxy envelope; missing peer ID", "ch_id", cs.Descriptor.ID, "msg", e.Message)
  127. }
  128. }
  129. }(cs)
  130. }
  131. }
  132. // handlePeerErrors iterates over each p2p Channel and starts a separate go-routine
  133. // where we listen for peer errors. For each peer error, we find the peer from
  134. // the legacy p2p Switch and execute a StopPeerForError call with the corresponding
  135. // peer error.
  136. func (rs *ReactorShim) handlePeerErrors() {
  137. for _, cs := range rs.Channels {
  138. go func(cs *ChannelShim) {
  139. for pErr := range cs.Channel.errCh {
  140. if pErr.PeerID != "" {
  141. peer := rs.Switch.peers.Get(pErr.PeerID)
  142. if peer == nil {
  143. rs.Logger.Error("failed to handle peer error; failed to find peer", "peer", pErr.PeerID)
  144. continue
  145. }
  146. rs.Switch.StopPeerForError(peer, pErr.Err)
  147. }
  148. }
  149. }(cs)
  150. }
  151. }
  152. // OnStart executes the reactor shim's OnStart hook where we start all the
  153. // necessary go-routines in order to proxy peer envelopes and errors per p2p
  154. // Channel.
  155. func (rs *ReactorShim) OnStart() error {
  156. if rs.Switch == nil {
  157. return errors.New("proxyPeerEnvelopes: reactor shim switch is nil")
  158. }
  159. // start envelope proxying and peer error handling in separate go routines
  160. rs.proxyPeerEnvelopes()
  161. rs.handlePeerErrors()
  162. return nil
  163. }
  164. // GetChannel returns a p2p Channel reference for a given ChannelID. If no
  165. // Channel exists, nil is returned.
  166. func (rs *ReactorShim) GetChannel(cID ChannelID) *Channel {
  167. channelShim, ok := rs.Channels[cID]
  168. if ok {
  169. return channelShim.Channel
  170. }
  171. return nil
  172. }
  173. // GetChannels implements the legacy Reactor interface for getting a slice of all
  174. // the supported ChannelDescriptors.
  175. func (rs *ReactorShim) GetChannels() []*ChannelDescriptor {
  176. sortedChIDs := make([]ChannelID, 0, len(rs.Channels))
  177. for cID := range rs.Channels {
  178. sortedChIDs = append(sortedChIDs, cID)
  179. }
  180. sort.Slice(sortedChIDs, func(i, j int) bool { return sortedChIDs[i] < sortedChIDs[j] })
  181. descriptors := make([]*ChannelDescriptor, len(rs.Channels))
  182. for i, cID := range sortedChIDs {
  183. descriptors[i] = rs.Channels[cID].Descriptor
  184. }
  185. return descriptors
  186. }
  187. // AddPeer sends a PeerUpdate with status PeerStatusUp on the PeerUpdateCh.
  188. // The embedding reactor must be sure to listen for messages on this channel to
  189. // handle adding a peer.
  190. func (rs *ReactorShim) AddPeer(peer Peer) {
  191. select {
  192. case rs.PeerUpdates.updatesCh <- PeerUpdate{PeerID: peer.ID(), Status: PeerStatusUp}:
  193. rs.Logger.Debug("sent peer update", "reactor", rs.Name, "peer", peer.ID(), "status", PeerStatusUp)
  194. case <-rs.PeerUpdates.Done():
  195. // NOTE: We explicitly DO NOT close the PeerUpdatesCh's updateCh go channel.
  196. // This is because there may be numerous spawned goroutines that are
  197. // attempting to send on the updateCh go channel and when the reactor stops
  198. // we do not want to preemptively close the channel as that could result in
  199. // panics sending on a closed channel. This also means that reactors MUST
  200. // be certain there are NO listeners on the updateCh channel when closing or
  201. // stopping.
  202. }
  203. }
  204. // RemovePeer sends a PeerUpdate with status PeerStatusDown on the PeerUpdateCh.
  205. // The embedding reactor must be sure to listen for messages on this channel to
  206. // handle removing a peer.
  207. func (rs *ReactorShim) RemovePeer(peer Peer, reason interface{}) {
  208. select {
  209. case rs.PeerUpdates.updatesCh <- PeerUpdate{PeerID: peer.ID(), Status: PeerStatusDown}:
  210. rs.Logger.Debug(
  211. "sent peer update",
  212. "reactor", rs.Name,
  213. "peer", peer.ID(),
  214. "reason", reason,
  215. "status", PeerStatusDown,
  216. )
  217. case <-rs.PeerUpdates.Done():
  218. // NOTE: We explicitly DO NOT close the PeerUpdatesCh's updateCh go channel.
  219. // This is because there may be numerous spawned goroutines that are
  220. // attempting to send on the updateCh go channel and when the reactor stops
  221. // we do not want to preemptively close the channel as that could result in
  222. // panics sending on a closed channel. This also means that reactors MUST
  223. // be certain there are NO listeners on the updateCh channel when closing or
  224. // stopping.
  225. }
  226. }
  227. // Receive implements a generic wrapper around implementing the Receive method
  228. // on the legacy Reactor p2p interface. If the reactor is running, Receive will
  229. // find the corresponding new p2p Channel, create and decode the appropriate
  230. // proto.Message from the msgBytes, execute any validation and finally construct
  231. // and send a p2p Envelope on the appropriate p2p Channel.
  232. func (rs *ReactorShim) Receive(chID byte, src Peer, msgBytes []byte) {
  233. if !rs.IsRunning() {
  234. return
  235. }
  236. cID := ChannelID(chID)
  237. channelShim, ok := rs.Channels[cID]
  238. if !ok {
  239. rs.Logger.Error("unexpected channel", "peer", src, "ch_id", chID)
  240. return
  241. }
  242. msg := proto.Clone(channelShim.Channel.messageType)
  243. msg.Reset()
  244. if err := proto.Unmarshal(msgBytes, msg); err != nil {
  245. rs.Logger.Error("error decoding message", "peer", src, "ch_id", cID, "msg", msg, "err", err)
  246. rs.Switch.StopPeerForError(src, err)
  247. return
  248. }
  249. validator, ok := msg.(messageValidator)
  250. if ok {
  251. if err := validator.Validate(); err != nil {
  252. rs.Logger.Error("invalid message", "peer", src, "ch_id", cID, "msg", msg, "err", err)
  253. rs.Switch.StopPeerForError(src, err)
  254. return
  255. }
  256. }
  257. wrapper, ok := msg.(Wrapper)
  258. if ok {
  259. var err error
  260. msg, err = wrapper.Unwrap()
  261. if err != nil {
  262. rs.Logger.Error("failed to unwrap message", "peer", src, "ch_id", chID, "msg", msg, "err", err)
  263. return
  264. }
  265. }
  266. select {
  267. case channelShim.Channel.inCh <- Envelope{From: src.ID(), Message: msg}:
  268. rs.Logger.Debug("proxied envelope", "reactor", rs.Name, "ch_id", cID, "peer", src.ID())
  269. case <-channelShim.Channel.Done():
  270. // NOTE: We explicitly DO NOT close the p2p Channel's inbound go channel.
  271. // This is because there may be numerous spawned goroutines that are
  272. // attempting to send on the inbound channel and when the reactor stops we
  273. // do not want to preemptively close the channel as that could result in
  274. // panics sending on a closed channel. This also means that reactors MUST
  275. // be certain there are NO listeners on the inbound channel when closing or
  276. // stopping.
  277. }
  278. }