You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

342 lines
10 KiB

  1. package p2p
  2. import (
  3. "errors"
  4. "sort"
  5. "github.com/gogo/protobuf/proto"
  6. )
  7. // ============================================================================
  8. // TODO: Types and business logic below are temporary and will be removed once
  9. // the legacy p2p stack is removed in favor of the new model.
  10. //
  11. // ref: https://github.com/tendermint/tendermint/issues/5670
  12. // ============================================================================
  13. var _ Reactor = (*ReactorShim)(nil)
  14. type (
  15. messageValidator interface {
  16. Validate() error
  17. }
  18. // ReactorShim defines a generic shim wrapper around a BaseReactor. It is
  19. // responsible for wiring up legacy p2p behavior to the new p2p semantics
  20. // (e.g. proxying Envelope messages to legacy peers).
  21. ReactorShim struct {
  22. BaseReactor
  23. Name string
  24. PeerUpdates *PeerUpdatesCh
  25. Channels map[ChannelID]*ChannelShim
  26. }
  27. // ChannelShim defines a generic shim wrapper around a legacy p2p channel
  28. // and the new p2p Channel. It also includes the raw bi-directional Go channels
  29. // so we can proxy message delivery.
  30. ChannelShim struct {
  31. Descriptor *ChannelDescriptor
  32. Channel *Channel
  33. }
  34. // ChannelDescriptorShim defines a shim wrapper around a legacy p2p channel
  35. // and the proto.Message the new p2p Channel is responsible for handling.
  36. // A ChannelDescriptorShim is not contained in ReactorShim, but is rather
  37. // used to construct a ReactorShim.
  38. ChannelDescriptorShim struct {
  39. MsgType proto.Message
  40. Descriptor *ChannelDescriptor
  41. }
  42. )
  43. func NewReactorShim(name string, descriptors map[ChannelID]*ChannelDescriptorShim) *ReactorShim {
  44. channels := make(map[ChannelID]*ChannelShim)
  45. for _, cds := range descriptors {
  46. chShim := NewChannelShim(cds, 0)
  47. channels[chShim.Channel.id] = chShim
  48. }
  49. rs := &ReactorShim{
  50. Name: name,
  51. PeerUpdates: NewPeerUpdates(),
  52. Channels: channels,
  53. }
  54. rs.BaseReactor = *NewBaseReactor(name, rs)
  55. return rs
  56. }
  57. func NewChannelShim(cds *ChannelDescriptorShim, buf uint) *ChannelShim {
  58. return &ChannelShim{
  59. Descriptor: cds.Descriptor,
  60. Channel: NewChannel(
  61. ChannelID(cds.Descriptor.ID),
  62. cds.MsgType,
  63. make(chan Envelope, buf),
  64. make(chan Envelope, buf),
  65. make(chan PeerError, buf),
  66. ),
  67. }
  68. }
  69. // proxyPeerEnvelopes iterates over each p2p Channel and starts a separate
  70. // go-routine where we listen for outbound envelopes sent during Receive
  71. // executions (or anything else that may send on the Channel) and proxy them to
  72. // the corresponding Peer using the To field from the envelope.
  73. func (rs *ReactorShim) proxyPeerEnvelopes() {
  74. for _, cs := range rs.Channels {
  75. go func(cs *ChannelShim) {
  76. for e := range cs.Channel.outCh {
  77. msg := proto.Clone(cs.Channel.messageType)
  78. msg.Reset()
  79. wrapper, ok := msg.(Wrapper)
  80. if ok {
  81. if err := wrapper.Wrap(e.Message); err != nil {
  82. rs.Logger.Error(
  83. "failed to proxy envelope; failed to wrap message",
  84. "ch_id", cs.Descriptor.ID,
  85. "msg", e.Message,
  86. "err", err,
  87. )
  88. continue
  89. }
  90. } else {
  91. msg = e.Message
  92. }
  93. bz, err := proto.Marshal(msg)
  94. if err != nil {
  95. rs.Logger.Error(
  96. "failed to proxy envelope; failed to encode message",
  97. "ch_id", cs.Descriptor.ID,
  98. "msg", e.Message,
  99. "err", err,
  100. )
  101. continue
  102. }
  103. switch {
  104. case e.Broadcast:
  105. rs.Switch.Broadcast(cs.Descriptor.ID, bz)
  106. case !e.To.Empty():
  107. src := rs.Switch.peers.Get(ID(e.To.String()))
  108. if src == nil {
  109. rs.Logger.Error(
  110. "failed to proxy envelope; failed to find peer",
  111. "ch_id", cs.Descriptor.ID,
  112. "msg", e.Message,
  113. "peer", e.To.String(),
  114. )
  115. continue
  116. }
  117. if !src.Send(cs.Descriptor.ID, bz) {
  118. rs.Logger.Error(
  119. "failed to proxy message to peer",
  120. "ch_id", cs.Descriptor.ID,
  121. "msg", e.Message,
  122. "peer", e.To.String(),
  123. )
  124. }
  125. default:
  126. rs.Logger.Error("failed to proxy envelope; missing peer ID", "ch_id", cs.Descriptor.ID, "msg", e.Message)
  127. }
  128. }
  129. }(cs)
  130. }
  131. }
  132. // handlePeerErrors iterates over each p2p Channel and starts a separate go-routine
  133. // where we listen for peer errors. For each peer error, we find the peer from
  134. // the legacy p2p Switch and execute a StopPeerForError call with the corresponding
  135. // peer error.
  136. func (rs *ReactorShim) handlePeerErrors() {
  137. for _, cs := range rs.Channels {
  138. go func(cs *ChannelShim) {
  139. for pErr := range cs.Channel.errCh {
  140. if !pErr.PeerID.Empty() {
  141. peer := rs.Switch.peers.Get(ID(pErr.PeerID.String()))
  142. if peer == nil {
  143. rs.Logger.Error("failed to handle peer error; failed to find peer", "peer", pErr.PeerID.String())
  144. continue
  145. }
  146. rs.Switch.StopPeerForError(peer, pErr.Err)
  147. }
  148. }
  149. }(cs)
  150. }
  151. }
  152. // OnStart executes the reactor shim's OnStart hook where we start all the
  153. // necessary go-routines in order to proxy peer envelopes and errors per p2p
  154. // Channel.
  155. func (rs *ReactorShim) OnStart() error {
  156. if rs.Switch == nil {
  157. return errors.New("proxyPeerEnvelopes: reactor shim switch is nil")
  158. }
  159. // start envelope proxying and peer error handling in separate go routines
  160. rs.proxyPeerEnvelopes()
  161. rs.handlePeerErrors()
  162. return nil
  163. }
  164. // GetChannel returns a p2p Channel reference for a given ChannelID. If no
  165. // Channel exists, nil is returned.
  166. func (rs *ReactorShim) GetChannel(cID ChannelID) *Channel {
  167. channelShim, ok := rs.Channels[cID]
  168. if ok {
  169. return channelShim.Channel
  170. }
  171. return nil
  172. }
  173. // GetChannels implements the legacy Reactor interface for getting a slice of all
  174. // the supported ChannelDescriptors.
  175. func (rs *ReactorShim) GetChannels() []*ChannelDescriptor {
  176. sortedChIDs := make([]ChannelID, 0, len(rs.Channels))
  177. for cID := range rs.Channels {
  178. sortedChIDs = append(sortedChIDs, cID)
  179. }
  180. sort.Slice(sortedChIDs, func(i, j int) bool { return sortedChIDs[i] < sortedChIDs[j] })
  181. descriptors := make([]*ChannelDescriptor, len(rs.Channels))
  182. for i, cID := range sortedChIDs {
  183. descriptors[i] = rs.Channels[cID].Descriptor
  184. }
  185. return descriptors
  186. }
  187. // AddPeer sends a PeerUpdate with status PeerStatusUp on the PeerUpdateCh.
  188. // The embedding reactor must be sure to listen for messages on this channel to
  189. // handle adding a peer.
  190. func (rs *ReactorShim) AddPeer(peer Peer) {
  191. peerID, err := PeerIDFromString(string(peer.ID()))
  192. if err != nil {
  193. rs.Logger.Error("failed to add peer", "peer", peer.ID(), "err", err)
  194. return
  195. }
  196. select {
  197. case rs.PeerUpdates.updatesCh <- PeerUpdate{PeerID: peerID, Status: PeerStatusUp}:
  198. rs.Logger.Debug("sent peer update", "reactor", rs.Name, "peer", peerID.String(), "status", PeerStatusUp)
  199. case <-rs.PeerUpdates.Done():
  200. // NOTE: We explicitly DO NOT close the PeerUpdatesCh's updateCh go channel.
  201. // This is because there may be numerous spawned goroutines that are
  202. // attempting to send on the updateCh go channel and when the reactor stops
  203. // we do not want to preemptively close the channel as that could result in
  204. // panics sending on a closed channel. This also means that reactors MUST
  205. // be certain there are NO listeners on the updateCh channel when closing or
  206. // stopping.
  207. }
  208. }
  209. // RemovePeer sends a PeerUpdate with status PeerStatusDown on the PeerUpdateCh.
  210. // The embedding reactor must be sure to listen for messages on this channel to
  211. // handle removing a peer.
  212. func (rs *ReactorShim) RemovePeer(peer Peer, reason interface{}) {
  213. peerID, err := PeerIDFromString(string(peer.ID()))
  214. if err != nil {
  215. rs.Logger.Error("failed to remove peer", "peer", peer.ID(), "err", err)
  216. return
  217. }
  218. select {
  219. case rs.PeerUpdates.updatesCh <- PeerUpdate{PeerID: peerID, Status: PeerStatusDown}:
  220. rs.Logger.Debug(
  221. "sent peer update",
  222. "reactor", rs.Name,
  223. "peer", peerID.String(),
  224. "reason", reason,
  225. "status", PeerStatusDown,
  226. )
  227. case <-rs.PeerUpdates.Done():
  228. // NOTE: We explicitly DO NOT close the PeerUpdatesCh's updateCh go channel.
  229. // This is because there may be numerous spawned goroutines that are
  230. // attempting to send on the updateCh go channel and when the reactor stops
  231. // we do not want to preemptively close the channel as that could result in
  232. // panics sending on a closed channel. This also means that reactors MUST
  233. // be certain there are NO listeners on the updateCh channel when closing or
  234. // stopping.
  235. }
  236. }
  237. // Receive implements a generic wrapper around implementing the Receive method
  238. // on the legacy Reactor p2p interface. If the reactor is running, Receive will
  239. // find the corresponding new p2p Channel, create and decode the appropriate
  240. // proto.Message from the msgBytes, execute any validation and finally construct
  241. // and send a p2p Envelope on the appropriate p2p Channel.
  242. func (rs *ReactorShim) Receive(chID byte, src Peer, msgBytes []byte) {
  243. if !rs.IsRunning() {
  244. return
  245. }
  246. cID := ChannelID(chID)
  247. channelShim, ok := rs.Channels[cID]
  248. if !ok {
  249. rs.Logger.Error("unexpected channel", "peer", src, "ch_id", chID)
  250. return
  251. }
  252. peerID, err := PeerIDFromString(string(src.ID()))
  253. if err != nil {
  254. rs.Logger.Error("failed to convert peer ID", "peer", src, "ch_id", chID, "err", err)
  255. return
  256. }
  257. msg := proto.Clone(channelShim.Channel.messageType)
  258. msg.Reset()
  259. if err := proto.Unmarshal(msgBytes, msg); err != nil {
  260. rs.Logger.Error("error decoding message", "peer", src, "ch_id", cID, "msg", msg, "err", err)
  261. rs.Switch.StopPeerForError(src, err)
  262. return
  263. }
  264. validator, ok := msg.(messageValidator)
  265. if ok {
  266. if err := validator.Validate(); err != nil {
  267. rs.Logger.Error("invalid message", "peer", src, "ch_id", cID, "msg", msg, "err", err)
  268. rs.Switch.StopPeerForError(src, err)
  269. return
  270. }
  271. }
  272. wrapper, ok := msg.(Wrapper)
  273. if ok {
  274. var err error
  275. msg, err = wrapper.Unwrap()
  276. if err != nil {
  277. rs.Logger.Error("failed to unwrap message", "peer", src, "ch_id", chID, "msg", msg, "err", err)
  278. return
  279. }
  280. }
  281. select {
  282. case channelShim.Channel.inCh <- Envelope{From: peerID, Message: msg}:
  283. rs.Logger.Debug("proxied envelope", "reactor", rs.Name, "ch_id", cID, "peer", peerID.String())
  284. case <-channelShim.Channel.Done():
  285. // NOTE: We explicitly DO NOT close the p2p Channel's inbound go channel.
  286. // This is because there may be numerous spawned goroutines that are
  287. // attempting to send on the inbound channel and when the reactor stops we
  288. // do not want to preemptively close the channel as that could result in
  289. // panics sending on a closed channel. This also means that reactors MUST
  290. // be certain there are NO listeners on the inbound channel when closing or
  291. // stopping.
  292. }
  293. }