You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

335 lines
10 KiB

  1. package p2p
  2. import (
  3. "errors"
  4. "sort"
  5. "github.com/gogo/protobuf/proto"
  6. "github.com/tendermint/tendermint/libs/log"
  7. )
  8. // ============================================================================
  9. // TODO: Types and business logic below are temporary and will be removed once
  10. // the legacy p2p stack is removed in favor of the new model.
  11. //
  12. // ref: https://github.com/tendermint/tendermint/issues/5670
  13. // ============================================================================
  14. var _ Reactor = (*ReactorShim)(nil)
  15. type (
  16. messageValidator interface {
  17. Validate() error
  18. }
  19. // ReactorShim defines a generic shim wrapper around a BaseReactor. It is
  20. // responsible for wiring up legacy p2p behavior to the new p2p semantics
  21. // (e.g. proxying Envelope messages to legacy peers).
  22. ReactorShim struct {
  23. BaseReactor
  24. Name string
  25. PeerUpdates *PeerUpdates
  26. Channels map[ChannelID]*ChannelShim
  27. }
  28. // ChannelShim defines a generic shim wrapper around a legacy p2p channel
  29. // and the new p2p Channel. It also includes the raw bi-directional Go channels
  30. // so we can proxy message delivery.
  31. ChannelShim struct {
  32. Descriptor *ChannelDescriptor
  33. Channel *Channel
  34. inCh chan<- Envelope
  35. outCh <-chan Envelope
  36. errCh <-chan PeerError
  37. }
  38. // ChannelDescriptorShim defines a shim wrapper around a legacy p2p channel
  39. // and the proto.Message the new p2p Channel is responsible for handling.
  40. // A ChannelDescriptorShim is not contained in ReactorShim, but is rather
  41. // used to construct a ReactorShim.
  42. ChannelDescriptorShim struct {
  43. MsgType proto.Message
  44. Descriptor *ChannelDescriptor
  45. }
  46. )
  47. func NewReactorShim(logger log.Logger, name string, descriptors map[ChannelID]*ChannelDescriptorShim) *ReactorShim {
  48. channels := make(map[ChannelID]*ChannelShim)
  49. for _, cds := range descriptors {
  50. chShim := NewChannelShim(cds, 0)
  51. channels[chShim.Channel.ID] = chShim
  52. }
  53. rs := &ReactorShim{
  54. Name: name,
  55. PeerUpdates: NewPeerUpdates(make(chan PeerUpdate)),
  56. Channels: channels,
  57. }
  58. rs.BaseReactor = *NewBaseReactor(name, rs)
  59. rs.SetLogger(logger)
  60. return rs
  61. }
  62. func NewChannelShim(cds *ChannelDescriptorShim, buf uint) *ChannelShim {
  63. inCh := make(chan Envelope, buf)
  64. outCh := make(chan Envelope, buf)
  65. errCh := make(chan PeerError, buf)
  66. return &ChannelShim{
  67. Descriptor: cds.Descriptor,
  68. Channel: NewChannel(
  69. ChannelID(cds.Descriptor.ID),
  70. cds.MsgType,
  71. inCh,
  72. outCh,
  73. errCh,
  74. ),
  75. inCh: inCh,
  76. outCh: outCh,
  77. errCh: errCh,
  78. }
  79. }
  80. // proxyPeerEnvelopes iterates over each p2p Channel and starts a separate
  81. // go-routine where we listen for outbound envelopes sent during Receive
  82. // executions (or anything else that may send on the Channel) and proxy them to
  83. // the corresponding Peer using the To field from the envelope.
  84. func (rs *ReactorShim) proxyPeerEnvelopes() {
  85. for _, cs := range rs.Channels {
  86. go func(cs *ChannelShim) {
  87. for e := range cs.outCh {
  88. msg := proto.Clone(cs.Channel.messageType)
  89. msg.Reset()
  90. wrapper, ok := msg.(Wrapper)
  91. if ok {
  92. if err := wrapper.Wrap(e.Message); err != nil {
  93. rs.Logger.Error(
  94. "failed to proxy envelope; failed to wrap message",
  95. "ch_id", cs.Descriptor.ID,
  96. "msg", e.Message,
  97. "err", err,
  98. )
  99. continue
  100. }
  101. } else {
  102. msg = e.Message
  103. }
  104. bz, err := proto.Marshal(msg)
  105. if err != nil {
  106. rs.Logger.Error(
  107. "failed to proxy envelope; failed to encode message",
  108. "ch_id", cs.Descriptor.ID,
  109. "msg", e.Message,
  110. "err", err,
  111. )
  112. continue
  113. }
  114. switch {
  115. case e.Broadcast:
  116. rs.Switch.Broadcast(cs.Descriptor.ID, bz)
  117. case e.To != "":
  118. src := rs.Switch.peers.Get(e.To)
  119. if src == nil {
  120. rs.Logger.Debug(
  121. "failed to proxy envelope; failed to find peer",
  122. "ch_id", cs.Descriptor.ID,
  123. "msg", e.Message,
  124. "peer", e.To,
  125. )
  126. continue
  127. }
  128. if !src.Send(cs.Descriptor.ID, bz) {
  129. rs.Logger.Error(
  130. "failed to proxy message to peer",
  131. "ch_id", cs.Descriptor.ID,
  132. "msg", e.Message,
  133. "peer", e.To,
  134. )
  135. }
  136. default:
  137. rs.Logger.Error("failed to proxy envelope; missing peer ID", "ch_id", cs.Descriptor.ID, "msg", e.Message)
  138. }
  139. }
  140. }(cs)
  141. }
  142. }
  143. // handlePeerErrors iterates over each p2p Channel and starts a separate go-routine
  144. // where we listen for peer errors. For each peer error, we find the peer from
  145. // the legacy p2p Switch and execute a StopPeerForError call with the corresponding
  146. // peer error.
  147. func (rs *ReactorShim) handlePeerErrors() {
  148. for _, cs := range rs.Channels {
  149. go func(cs *ChannelShim) {
  150. for pErr := range cs.errCh {
  151. if pErr.NodeID != "" {
  152. peer := rs.Switch.peers.Get(pErr.NodeID)
  153. if peer == nil {
  154. rs.Logger.Error("failed to handle peer error; failed to find peer", "peer", pErr.NodeID)
  155. continue
  156. }
  157. rs.Switch.StopPeerForError(peer, pErr.Err)
  158. }
  159. }
  160. }(cs)
  161. }
  162. }
  163. // OnStart executes the reactor shim's OnStart hook where we start all the
  164. // necessary go-routines in order to proxy peer envelopes and errors per p2p
  165. // Channel.
  166. func (rs *ReactorShim) OnStart() error {
  167. if rs.Switch == nil {
  168. return errors.New("proxyPeerEnvelopes: reactor shim switch is nil")
  169. }
  170. // start envelope proxying and peer error handling in separate go routines
  171. rs.proxyPeerEnvelopes()
  172. rs.handlePeerErrors()
  173. return nil
  174. }
  175. // GetChannel returns a p2p Channel reference for a given ChannelID. If no
  176. // Channel exists, nil is returned.
  177. func (rs *ReactorShim) GetChannel(cID ChannelID) *Channel {
  178. channelShim, ok := rs.Channels[cID]
  179. if ok {
  180. return channelShim.Channel
  181. }
  182. return nil
  183. }
  184. // GetChannels implements the legacy Reactor interface for getting a slice of all
  185. // the supported ChannelDescriptors.
  186. func (rs *ReactorShim) GetChannels() []*ChannelDescriptor {
  187. sortedChIDs := make([]ChannelID, 0, len(rs.Channels))
  188. for cID := range rs.Channels {
  189. sortedChIDs = append(sortedChIDs, cID)
  190. }
  191. sort.Slice(sortedChIDs, func(i, j int) bool { return sortedChIDs[i] < sortedChIDs[j] })
  192. descriptors := make([]*ChannelDescriptor, len(rs.Channels))
  193. for i, cID := range sortedChIDs {
  194. descriptors[i] = rs.Channels[cID].Descriptor
  195. }
  196. return descriptors
  197. }
  198. // AddPeer sends a PeerUpdate with status PeerStatusUp on the PeerUpdateCh.
  199. // The embedding reactor must be sure to listen for messages on this channel to
  200. // handle adding a peer.
  201. func (rs *ReactorShim) AddPeer(peer Peer) {
  202. select {
  203. case rs.PeerUpdates.updatesCh <- PeerUpdate{NodeID: peer.ID(), Status: PeerStatusUp}:
  204. rs.Logger.Debug("sent peer update", "reactor", rs.Name, "peer", peer.ID(), "status", PeerStatusUp)
  205. case <-rs.PeerUpdates.Done():
  206. // NOTE: We explicitly DO NOT close the PeerUpdatesCh's updateCh go channel.
  207. // This is because there may be numerous spawned goroutines that are
  208. // attempting to send on the updateCh go channel and when the reactor stops
  209. // we do not want to preemptively close the channel as that could result in
  210. // panics sending on a closed channel. This also means that reactors MUST
  211. // be certain there are NO listeners on the updateCh channel when closing or
  212. // stopping.
  213. }
  214. }
  215. // RemovePeer sends a PeerUpdate with status PeerStatusDown on the PeerUpdateCh.
  216. // The embedding reactor must be sure to listen for messages on this channel to
  217. // handle removing a peer.
  218. func (rs *ReactorShim) RemovePeer(peer Peer, reason interface{}) {
  219. select {
  220. case rs.PeerUpdates.updatesCh <- PeerUpdate{NodeID: peer.ID(), Status: PeerStatusDown}:
  221. rs.Logger.Debug(
  222. "sent peer update",
  223. "reactor", rs.Name,
  224. "peer", peer.ID(),
  225. "reason", reason,
  226. "status", PeerStatusDown,
  227. )
  228. case <-rs.PeerUpdates.Done():
  229. // NOTE: We explicitly DO NOT close the PeerUpdatesCh's updateCh go channel.
  230. // This is because there may be numerous spawned goroutines that are
  231. // attempting to send on the updateCh go channel and when the reactor stops
  232. // we do not want to preemptively close the channel as that could result in
  233. // panics sending on a closed channel. This also means that reactors MUST
  234. // be certain there are NO listeners on the updateCh channel when closing or
  235. // stopping.
  236. }
  237. }
  238. // Receive implements a generic wrapper around implementing the Receive method
  239. // on the legacy Reactor p2p interface. If the reactor is running, Receive will
  240. // find the corresponding new p2p Channel, create and decode the appropriate
  241. // proto.Message from the msgBytes, execute any validation and finally construct
  242. // and send a p2p Envelope on the appropriate p2p Channel.
  243. func (rs *ReactorShim) Receive(chID byte, src Peer, msgBytes []byte) {
  244. if !rs.IsRunning() {
  245. return
  246. }
  247. cID := ChannelID(chID)
  248. channelShim, ok := rs.Channels[cID]
  249. if !ok {
  250. rs.Logger.Error("unexpected channel", "peer", src, "ch_id", chID)
  251. return
  252. }
  253. msg := proto.Clone(channelShim.Channel.messageType)
  254. msg.Reset()
  255. if err := proto.Unmarshal(msgBytes, msg); err != nil {
  256. rs.Logger.Error("error decoding message", "peer", src, "ch_id", cID, "msg", msg, "err", err)
  257. rs.Switch.StopPeerForError(src, err)
  258. return
  259. }
  260. validator, ok := msg.(messageValidator)
  261. if ok {
  262. if err := validator.Validate(); err != nil {
  263. rs.Logger.Error("invalid message", "peer", src, "ch_id", cID, "msg", msg, "err", err)
  264. rs.Switch.StopPeerForError(src, err)
  265. return
  266. }
  267. }
  268. wrapper, ok := msg.(Wrapper)
  269. if ok {
  270. var err error
  271. msg, err = wrapper.Unwrap()
  272. if err != nil {
  273. rs.Logger.Error("failed to unwrap message", "peer", src, "ch_id", chID, "msg", msg, "err", err)
  274. return
  275. }
  276. }
  277. select {
  278. case channelShim.inCh <- Envelope{From: src.ID(), Message: msg}:
  279. rs.Logger.Debug("proxied envelope", "reactor", rs.Name, "ch_id", cID, "peer", src.ID())
  280. case <-channelShim.Channel.Done():
  281. // NOTE: We explicitly DO NOT close the p2p Channel's inbound go channel.
  282. // This is because there may be numerous spawned goroutines that are
  283. // attempting to send on the inbound channel and when the reactor stops we
  284. // do not want to preemptively close the channel as that could result in
  285. // panics sending on a closed channel. This also means that reactors MUST
  286. // be certain there are NO listeners on the inbound channel when closing or
  287. // stopping.
  288. }
  289. }