You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

326 lines
9.8 KiB

  1. package p2p
  2. import (
  3. "errors"
  4. "sort"
  5. "github.com/gogo/protobuf/proto"
  6. "github.com/tendermint/tendermint/libs/log"
  7. )
  8. // ============================================================================
  9. // TODO: Types and business logic below are temporary and will be removed once
  10. // the legacy p2p stack is removed in favor of the new model.
  11. //
  12. // ref: https://github.com/tendermint/tendermint/issues/5670
  13. // ============================================================================
  14. var _ Reactor = (*ReactorShim)(nil)
  15. type (
  16. messageValidator interface {
  17. Validate() error
  18. }
  19. // ReactorShim defines a generic shim wrapper around a BaseReactor. It is
  20. // responsible for wiring up legacy p2p behavior to the new p2p semantics
  21. // (e.g. proxying Envelope messages to legacy peers).
  22. ReactorShim struct {
  23. BaseReactor
  24. Name string
  25. PeerUpdates *PeerUpdatesCh
  26. Channels map[ChannelID]*ChannelShim
  27. }
  28. // ChannelShim defines a generic shim wrapper around a legacy p2p channel
  29. // and the new p2p Channel. It also includes the raw bi-directional Go channels
  30. // so we can proxy message delivery.
  31. ChannelShim struct {
  32. Descriptor *ChannelDescriptor
  33. Channel *Channel
  34. }
  35. // ChannelDescriptorShim defines a shim wrapper around a legacy p2p channel
  36. // and the proto.Message the new p2p Channel is responsible for handling.
  37. // A ChannelDescriptorShim is not contained in ReactorShim, but is rather
  38. // used to construct a ReactorShim.
  39. ChannelDescriptorShim struct {
  40. MsgType proto.Message
  41. Descriptor *ChannelDescriptor
  42. }
  43. )
  44. func NewReactorShim(logger log.Logger, name string, descriptors map[ChannelID]*ChannelDescriptorShim) *ReactorShim {
  45. channels := make(map[ChannelID]*ChannelShim)
  46. for _, cds := range descriptors {
  47. chShim := NewChannelShim(cds, 0)
  48. channels[chShim.Channel.id] = chShim
  49. }
  50. rs := &ReactorShim{
  51. Name: name,
  52. PeerUpdates: NewPeerUpdates(make(chan PeerUpdate)),
  53. Channels: channels,
  54. }
  55. rs.BaseReactor = *NewBaseReactor(name, rs)
  56. rs.SetLogger(logger)
  57. return rs
  58. }
  59. func NewChannelShim(cds *ChannelDescriptorShim, buf uint) *ChannelShim {
  60. return &ChannelShim{
  61. Descriptor: cds.Descriptor,
  62. Channel: NewChannel(
  63. ChannelID(cds.Descriptor.ID),
  64. cds.MsgType,
  65. make(chan Envelope, buf),
  66. make(chan Envelope, buf),
  67. make(chan PeerError, buf),
  68. ),
  69. }
  70. }
  71. // proxyPeerEnvelopes iterates over each p2p Channel and starts a separate
  72. // go-routine where we listen for outbound envelopes sent during Receive
  73. // executions (or anything else that may send on the Channel) and proxy them to
  74. // the corresponding Peer using the To field from the envelope.
  75. func (rs *ReactorShim) proxyPeerEnvelopes() {
  76. for _, cs := range rs.Channels {
  77. go func(cs *ChannelShim) {
  78. for e := range cs.Channel.outCh {
  79. msg := proto.Clone(cs.Channel.messageType)
  80. msg.Reset()
  81. wrapper, ok := msg.(Wrapper)
  82. if ok {
  83. if err := wrapper.Wrap(e.Message); err != nil {
  84. rs.Logger.Error(
  85. "failed to proxy envelope; failed to wrap message",
  86. "ch_id", cs.Descriptor.ID,
  87. "msg", e.Message,
  88. "err", err,
  89. )
  90. continue
  91. }
  92. } else {
  93. msg = e.Message
  94. }
  95. bz, err := proto.Marshal(msg)
  96. if err != nil {
  97. rs.Logger.Error(
  98. "failed to proxy envelope; failed to encode message",
  99. "ch_id", cs.Descriptor.ID,
  100. "msg", e.Message,
  101. "err", err,
  102. )
  103. continue
  104. }
  105. switch {
  106. case e.Broadcast:
  107. rs.Switch.Broadcast(cs.Descriptor.ID, bz)
  108. case e.To != "":
  109. src := rs.Switch.peers.Get(e.To)
  110. if src == nil {
  111. rs.Logger.Debug(
  112. "failed to proxy envelope; failed to find peer",
  113. "ch_id", cs.Descriptor.ID,
  114. "msg", e.Message,
  115. "peer", e.To,
  116. )
  117. continue
  118. }
  119. if !src.Send(cs.Descriptor.ID, bz) {
  120. rs.Logger.Error(
  121. "failed to proxy message to peer",
  122. "ch_id", cs.Descriptor.ID,
  123. "msg", e.Message,
  124. "peer", e.To,
  125. )
  126. }
  127. default:
  128. rs.Logger.Error("failed to proxy envelope; missing peer ID", "ch_id", cs.Descriptor.ID, "msg", e.Message)
  129. }
  130. }
  131. }(cs)
  132. }
  133. }
  134. // handlePeerErrors iterates over each p2p Channel and starts a separate go-routine
  135. // where we listen for peer errors. For each peer error, we find the peer from
  136. // the legacy p2p Switch and execute a StopPeerForError call with the corresponding
  137. // peer error.
  138. func (rs *ReactorShim) handlePeerErrors() {
  139. for _, cs := range rs.Channels {
  140. go func(cs *ChannelShim) {
  141. for pErr := range cs.Channel.errCh {
  142. if pErr.PeerID != "" {
  143. peer := rs.Switch.peers.Get(pErr.PeerID)
  144. if peer == nil {
  145. rs.Logger.Error("failed to handle peer error; failed to find peer", "peer", pErr.PeerID)
  146. continue
  147. }
  148. rs.Switch.StopPeerForError(peer, pErr.Err)
  149. }
  150. }
  151. }(cs)
  152. }
  153. }
  154. // OnStart executes the reactor shim's OnStart hook where we start all the
  155. // necessary go-routines in order to proxy peer envelopes and errors per p2p
  156. // Channel.
  157. func (rs *ReactorShim) OnStart() error {
  158. if rs.Switch == nil {
  159. return errors.New("proxyPeerEnvelopes: reactor shim switch is nil")
  160. }
  161. // start envelope proxying and peer error handling in separate go routines
  162. rs.proxyPeerEnvelopes()
  163. rs.handlePeerErrors()
  164. return nil
  165. }
  166. // GetChannel returns a p2p Channel reference for a given ChannelID. If no
  167. // Channel exists, nil is returned.
  168. func (rs *ReactorShim) GetChannel(cID ChannelID) *Channel {
  169. channelShim, ok := rs.Channels[cID]
  170. if ok {
  171. return channelShim.Channel
  172. }
  173. return nil
  174. }
  175. // GetChannels implements the legacy Reactor interface for getting a slice of all
  176. // the supported ChannelDescriptors.
  177. func (rs *ReactorShim) GetChannels() []*ChannelDescriptor {
  178. sortedChIDs := make([]ChannelID, 0, len(rs.Channels))
  179. for cID := range rs.Channels {
  180. sortedChIDs = append(sortedChIDs, cID)
  181. }
  182. sort.Slice(sortedChIDs, func(i, j int) bool { return sortedChIDs[i] < sortedChIDs[j] })
  183. descriptors := make([]*ChannelDescriptor, len(rs.Channels))
  184. for i, cID := range sortedChIDs {
  185. descriptors[i] = rs.Channels[cID].Descriptor
  186. }
  187. return descriptors
  188. }
  189. // AddPeer sends a PeerUpdate with status PeerStatusUp on the PeerUpdateCh.
  190. // The embedding reactor must be sure to listen for messages on this channel to
  191. // handle adding a peer.
  192. func (rs *ReactorShim) AddPeer(peer Peer) {
  193. select {
  194. case rs.PeerUpdates.updatesCh <- PeerUpdate{PeerID: peer.ID(), Status: PeerStatusUp}:
  195. rs.Logger.Debug("sent peer update", "reactor", rs.Name, "peer", peer.ID(), "status", PeerStatusUp)
  196. case <-rs.PeerUpdates.Done():
  197. // NOTE: We explicitly DO NOT close the PeerUpdatesCh's updateCh go channel.
  198. // This is because there may be numerous spawned goroutines that are
  199. // attempting to send on the updateCh go channel and when the reactor stops
  200. // we do not want to preemptively close the channel as that could result in
  201. // panics sending on a closed channel. This also means that reactors MUST
  202. // be certain there are NO listeners on the updateCh channel when closing or
  203. // stopping.
  204. }
  205. }
  206. // RemovePeer sends a PeerUpdate with status PeerStatusDown on the PeerUpdateCh.
  207. // The embedding reactor must be sure to listen for messages on this channel to
  208. // handle removing a peer.
  209. func (rs *ReactorShim) RemovePeer(peer Peer, reason interface{}) {
  210. select {
  211. case rs.PeerUpdates.updatesCh <- PeerUpdate{PeerID: peer.ID(), Status: PeerStatusDown}:
  212. rs.Logger.Debug(
  213. "sent peer update",
  214. "reactor", rs.Name,
  215. "peer", peer.ID(),
  216. "reason", reason,
  217. "status", PeerStatusDown,
  218. )
  219. case <-rs.PeerUpdates.Done():
  220. // NOTE: We explicitly DO NOT close the PeerUpdatesCh's updateCh go channel.
  221. // This is because there may be numerous spawned goroutines that are
  222. // attempting to send on the updateCh go channel and when the reactor stops
  223. // we do not want to preemptively close the channel as that could result in
  224. // panics sending on a closed channel. This also means that reactors MUST
  225. // be certain there are NO listeners on the updateCh channel when closing or
  226. // stopping.
  227. }
  228. }
  229. // Receive implements a generic wrapper around implementing the Receive method
  230. // on the legacy Reactor p2p interface. If the reactor is running, Receive will
  231. // find the corresponding new p2p Channel, create and decode the appropriate
  232. // proto.Message from the msgBytes, execute any validation and finally construct
  233. // and send a p2p Envelope on the appropriate p2p Channel.
  234. func (rs *ReactorShim) Receive(chID byte, src Peer, msgBytes []byte) {
  235. if !rs.IsRunning() {
  236. return
  237. }
  238. cID := ChannelID(chID)
  239. channelShim, ok := rs.Channels[cID]
  240. if !ok {
  241. rs.Logger.Error("unexpected channel", "peer", src, "ch_id", chID)
  242. return
  243. }
  244. msg := proto.Clone(channelShim.Channel.messageType)
  245. msg.Reset()
  246. if err := proto.Unmarshal(msgBytes, msg); err != nil {
  247. rs.Logger.Error("error decoding message", "peer", src, "ch_id", cID, "msg", msg, "err", err)
  248. rs.Switch.StopPeerForError(src, err)
  249. return
  250. }
  251. validator, ok := msg.(messageValidator)
  252. if ok {
  253. if err := validator.Validate(); err != nil {
  254. rs.Logger.Error("invalid message", "peer", src, "ch_id", cID, "msg", msg, "err", err)
  255. rs.Switch.StopPeerForError(src, err)
  256. return
  257. }
  258. }
  259. wrapper, ok := msg.(Wrapper)
  260. if ok {
  261. var err error
  262. msg, err = wrapper.Unwrap()
  263. if err != nil {
  264. rs.Logger.Error("failed to unwrap message", "peer", src, "ch_id", chID, "msg", msg, "err", err)
  265. return
  266. }
  267. }
  268. select {
  269. case channelShim.Channel.inCh <- Envelope{From: src.ID(), Message: msg}:
  270. rs.Logger.Debug("proxied envelope", "reactor", rs.Name, "ch_id", cID, "peer", src.ID())
  271. case <-channelShim.Channel.Done():
  272. // NOTE: We explicitly DO NOT close the p2p Channel's inbound go channel.
  273. // This is because there may be numerous spawned goroutines that are
  274. // attempting to send on the inbound channel and when the reactor stops we
  275. // do not want to preemptively close the channel as that could result in
  276. // panics sending on a closed channel. This also means that reactors MUST
  277. // be certain there are NO listeners on the inbound channel when closing or
  278. // stopping.
  279. }
  280. }