You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

331 lines
9.8 KiB

  1. package p2p
  2. import (
  3. "errors"
  4. "sort"
  5. "github.com/gogo/protobuf/proto"
  6. "github.com/tendermint/tendermint/libs/log"
  7. )
  8. // ============================================================================
  9. // TODO: Types and business logic below are temporary and will be removed once
  10. // the legacy p2p stack is removed in favor of the new model.
  11. //
  12. // ref: https://github.com/tendermint/tendermint/issues/5670
  13. // ============================================================================
  14. var _ Reactor = (*ReactorShim)(nil)
  15. type (
  16. messageValidator interface {
  17. Validate() error
  18. }
  19. // ReactorShim defines a generic shim wrapper around a BaseReactor. It is
  20. // responsible for wiring up legacy p2p behavior to the new p2p semantics
  21. // (e.g. proxying Envelope messages to legacy peers).
  22. ReactorShim struct {
  23. BaseReactor
  24. Name string
  25. PeerUpdates *PeerUpdates
  26. Channels map[ChannelID]*ChannelShim
  27. }
  28. // ChannelShim defines a generic shim wrapper around a legacy p2p channel
  29. // and the new p2p Channel. It also includes the raw bi-directional Go channels
  30. // so we can proxy message delivery.
  31. ChannelShim struct {
  32. Descriptor *ChannelDescriptor
  33. Channel *Channel
  34. inCh chan<- Envelope
  35. outCh <-chan Envelope
  36. errCh <-chan PeerError
  37. }
  38. // ChannelDescriptorShim defines a shim wrapper around a legacy p2p channel
  39. // and the proto.Message the new p2p Channel is responsible for handling.
  40. // A ChannelDescriptorShim is not contained in ReactorShim, but is rather
  41. // used to construct a ReactorShim.
  42. ChannelDescriptorShim struct {
  43. MsgType proto.Message
  44. Descriptor *ChannelDescriptor
  45. }
  46. )
  47. func NewReactorShim(logger log.Logger, name string, descriptors map[ChannelID]*ChannelDescriptorShim) *ReactorShim {
  48. channels := make(map[ChannelID]*ChannelShim)
  49. for _, cds := range descriptors {
  50. chShim := NewChannelShim(cds, 0)
  51. channels[chShim.Channel.ID] = chShim
  52. }
  53. rs := &ReactorShim{
  54. Name: name,
  55. PeerUpdates: NewPeerUpdates(make(chan PeerUpdate)),
  56. Channels: channels,
  57. }
  58. rs.BaseReactor = *NewBaseReactor(name, rs)
  59. rs.SetLogger(logger)
  60. return rs
  61. }
  62. func NewChannelShim(cds *ChannelDescriptorShim, buf uint) *ChannelShim {
  63. inCh := make(chan Envelope, buf)
  64. outCh := make(chan Envelope, buf)
  65. errCh := make(chan PeerError, buf)
  66. return &ChannelShim{
  67. Descriptor: cds.Descriptor,
  68. Channel: NewChannel(
  69. ChannelID(cds.Descriptor.ID),
  70. cds.MsgType,
  71. inCh,
  72. outCh,
  73. errCh,
  74. ),
  75. inCh: inCh,
  76. outCh: outCh,
  77. errCh: errCh,
  78. }
  79. }
  80. // proxyPeerEnvelopes iterates over each p2p Channel and starts a separate
  81. // go-routine where we listen for outbound envelopes sent during Receive
  82. // executions (or anything else that may send on the Channel) and proxy them to
  83. // the corresponding Peer using the To field from the envelope.
  84. func (rs *ReactorShim) proxyPeerEnvelopes() {
  85. for _, cs := range rs.Channels {
  86. go func(cs *ChannelShim) {
  87. for e := range cs.outCh {
  88. msg := proto.Clone(cs.Channel.messageType)
  89. msg.Reset()
  90. wrapper, ok := msg.(Wrapper)
  91. if ok {
  92. if err := wrapper.Wrap(e.Message); err != nil {
  93. rs.Logger.Error(
  94. "failed to proxy envelope; failed to wrap message",
  95. "ch_id", cs.Descriptor.ID,
  96. "err", err,
  97. )
  98. continue
  99. }
  100. } else {
  101. msg = e.Message
  102. }
  103. bz, err := proto.Marshal(msg)
  104. if err != nil {
  105. rs.Logger.Error(
  106. "failed to proxy envelope; failed to encode message",
  107. "ch_id", cs.Descriptor.ID,
  108. "err", err,
  109. )
  110. continue
  111. }
  112. switch {
  113. case e.Broadcast:
  114. rs.Switch.Broadcast(cs.Descriptor.ID, bz)
  115. case e.To != "":
  116. src := rs.Switch.peers.Get(e.To)
  117. if src == nil {
  118. rs.Logger.Debug(
  119. "failed to proxy envelope; failed to find peer",
  120. "ch_id", cs.Descriptor.ID,
  121. "peer", e.To,
  122. )
  123. continue
  124. }
  125. if !src.Send(cs.Descriptor.ID, bz) {
  126. rs.Logger.Error(
  127. "failed to proxy message to peer",
  128. "ch_id", cs.Descriptor.ID,
  129. "peer", e.To,
  130. )
  131. }
  132. default:
  133. rs.Logger.Error("failed to proxy envelope; missing peer ID", "ch_id", cs.Descriptor.ID)
  134. }
  135. }
  136. }(cs)
  137. }
  138. }
  139. // handlePeerErrors iterates over each p2p Channel and starts a separate go-routine
  140. // where we listen for peer errors. For each peer error, we find the peer from
  141. // the legacy p2p Switch and execute a StopPeerForError call with the corresponding
  142. // peer error.
  143. func (rs *ReactorShim) handlePeerErrors() {
  144. for _, cs := range rs.Channels {
  145. go func(cs *ChannelShim) {
  146. for pErr := range cs.errCh {
  147. if pErr.NodeID != "" {
  148. peer := rs.Switch.peers.Get(pErr.NodeID)
  149. if peer == nil {
  150. rs.Logger.Error("failed to handle peer error; failed to find peer", "peer", pErr.NodeID)
  151. continue
  152. }
  153. rs.Switch.StopPeerForError(peer, pErr.Err)
  154. }
  155. }
  156. }(cs)
  157. }
  158. }
  159. // OnStart executes the reactor shim's OnStart hook where we start all the
  160. // necessary go-routines in order to proxy peer envelopes and errors per p2p
  161. // Channel.
  162. func (rs *ReactorShim) OnStart() error {
  163. if rs.Switch == nil {
  164. return errors.New("proxyPeerEnvelopes: reactor shim switch is nil")
  165. }
  166. // start envelope proxying and peer error handling in separate go routines
  167. rs.proxyPeerEnvelopes()
  168. rs.handlePeerErrors()
  169. return nil
  170. }
  171. // GetChannel returns a p2p Channel reference for a given ChannelID. If no
  172. // Channel exists, nil is returned.
  173. func (rs *ReactorShim) GetChannel(cID ChannelID) *Channel {
  174. channelShim, ok := rs.Channels[cID]
  175. if ok {
  176. return channelShim.Channel
  177. }
  178. return nil
  179. }
  180. // GetChannels implements the legacy Reactor interface for getting a slice of all
  181. // the supported ChannelDescriptors.
  182. func (rs *ReactorShim) GetChannels() []*ChannelDescriptor {
  183. sortedChIDs := make([]ChannelID, 0, len(rs.Channels))
  184. for cID := range rs.Channels {
  185. sortedChIDs = append(sortedChIDs, cID)
  186. }
  187. sort.Slice(sortedChIDs, func(i, j int) bool { return sortedChIDs[i] < sortedChIDs[j] })
  188. descriptors := make([]*ChannelDescriptor, len(rs.Channels))
  189. for i, cID := range sortedChIDs {
  190. descriptors[i] = rs.Channels[cID].Descriptor
  191. }
  192. return descriptors
  193. }
  194. // AddPeer sends a PeerUpdate with status PeerStatusUp on the PeerUpdateCh.
  195. // The embedding reactor must be sure to listen for messages on this channel to
  196. // handle adding a peer.
  197. func (rs *ReactorShim) AddPeer(peer Peer) {
  198. select {
  199. case rs.PeerUpdates.updatesCh <- PeerUpdate{NodeID: peer.ID(), Status: PeerStatusUp}:
  200. rs.Logger.Debug("sent peer update", "reactor", rs.Name, "peer", peer.ID(), "status", PeerStatusUp)
  201. case <-rs.PeerUpdates.Done():
  202. // NOTE: We explicitly DO NOT close the PeerUpdatesCh's updateCh go channel.
  203. // This is because there may be numerous spawned goroutines that are
  204. // attempting to send on the updateCh go channel and when the reactor stops
  205. // we do not want to preemptively close the channel as that could result in
  206. // panics sending on a closed channel. This also means that reactors MUST
  207. // be certain there are NO listeners on the updateCh channel when closing or
  208. // stopping.
  209. }
  210. }
  211. // RemovePeer sends a PeerUpdate with status PeerStatusDown on the PeerUpdateCh.
  212. // The embedding reactor must be sure to listen for messages on this channel to
  213. // handle removing a peer.
  214. func (rs *ReactorShim) RemovePeer(peer Peer, reason interface{}) {
  215. select {
  216. case rs.PeerUpdates.updatesCh <- PeerUpdate{NodeID: peer.ID(), Status: PeerStatusDown}:
  217. rs.Logger.Debug(
  218. "sent peer update",
  219. "reactor", rs.Name,
  220. "peer", peer.ID(),
  221. "reason", reason,
  222. "status", PeerStatusDown,
  223. )
  224. case <-rs.PeerUpdates.Done():
  225. // NOTE: We explicitly DO NOT close the PeerUpdatesCh's updateCh go channel.
  226. // This is because there may be numerous spawned goroutines that are
  227. // attempting to send on the updateCh go channel and when the reactor stops
  228. // we do not want to preemptively close the channel as that could result in
  229. // panics sending on a closed channel. This also means that reactors MUST
  230. // be certain there are NO listeners on the updateCh channel when closing or
  231. // stopping.
  232. }
  233. }
  234. // Receive implements a generic wrapper around implementing the Receive method
  235. // on the legacy Reactor p2p interface. If the reactor is running, Receive will
  236. // find the corresponding new p2p Channel, create and decode the appropriate
  237. // proto.Message from the msgBytes, execute any validation and finally construct
  238. // and send a p2p Envelope on the appropriate p2p Channel.
  239. func (rs *ReactorShim) Receive(chID byte, src Peer, msgBytes []byte) {
  240. if !rs.IsRunning() {
  241. return
  242. }
  243. cID := ChannelID(chID)
  244. channelShim, ok := rs.Channels[cID]
  245. if !ok {
  246. rs.Logger.Error("unexpected channel", "peer", src, "ch_id", chID)
  247. return
  248. }
  249. msg := proto.Clone(channelShim.Channel.messageType)
  250. msg.Reset()
  251. if err := proto.Unmarshal(msgBytes, msg); err != nil {
  252. rs.Logger.Error("error decoding message", "peer", src, "ch_id", cID, "err", err)
  253. rs.Switch.StopPeerForError(src, err)
  254. return
  255. }
  256. validator, ok := msg.(messageValidator)
  257. if ok {
  258. if err := validator.Validate(); err != nil {
  259. rs.Logger.Error("invalid message", "peer", src, "ch_id", cID, "err", err)
  260. rs.Switch.StopPeerForError(src, err)
  261. return
  262. }
  263. }
  264. wrapper, ok := msg.(Wrapper)
  265. if ok {
  266. var err error
  267. msg, err = wrapper.Unwrap()
  268. if err != nil {
  269. rs.Logger.Error("failed to unwrap message", "peer", src, "ch_id", chID, "err", err)
  270. return
  271. }
  272. }
  273. select {
  274. case channelShim.inCh <- Envelope{From: src.ID(), Message: msg}:
  275. rs.Logger.Debug("proxied envelope", "reactor", rs.Name, "ch_id", cID, "peer", src.ID())
  276. case <-channelShim.Channel.Done():
  277. // NOTE: We explicitly DO NOT close the p2p Channel's inbound go channel.
  278. // This is because there may be numerous spawned goroutines that are
  279. // attempting to send on the inbound channel and when the reactor stops we
  280. // do not want to preemptively close the channel as that could result in
  281. // panics sending on a closed channel. This also means that reactors MUST
  282. // be certain there are NO listeners on the inbound channel when closing or
  283. // stopping.
  284. }
  285. }