You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

504 lines
14 KiB

  1. package statesync
  2. import (
  3. "context"
  4. "errors"
  5. "fmt"
  6. "sort"
  7. "time"
  8. abci "github.com/tendermint/tendermint/abci/types"
  9. "github.com/tendermint/tendermint/libs/log"
  10. "github.com/tendermint/tendermint/libs/service"
  11. tmsync "github.com/tendermint/tendermint/libs/sync"
  12. "github.com/tendermint/tendermint/p2p"
  13. ssproto "github.com/tendermint/tendermint/proto/tendermint/statesync"
  14. "github.com/tendermint/tendermint/proxy"
  15. sm "github.com/tendermint/tendermint/state"
  16. "github.com/tendermint/tendermint/types"
  17. )
  18. var (
  19. _ service.Service = (*Reactor)(nil)
  20. _ p2p.Wrapper = (*ssproto.Message)(nil)
  21. // ChannelShims contains a map of ChannelDescriptorShim objects, where each
  22. // object wraps a reference to a legacy p2p ChannelDescriptor and the corresponding
  23. // p2p proto.Message the new p2p Channel is responsible for handling.
  24. //
  25. //
  26. // TODO: Remove once p2p refactor is complete.
  27. // ref: https://github.com/tendermint/tendermint/issues/5670
  28. ChannelShims = map[p2p.ChannelID]*p2p.ChannelDescriptorShim{
  29. SnapshotChannel: {
  30. MsgType: new(ssproto.Message),
  31. Descriptor: &p2p.ChannelDescriptor{
  32. ID: byte(SnapshotChannel),
  33. Priority: 3,
  34. SendQueueCapacity: 10,
  35. RecvMessageCapacity: snapshotMsgSize,
  36. },
  37. },
  38. ChunkChannel: {
  39. MsgType: new(ssproto.Message),
  40. Descriptor: &p2p.ChannelDescriptor{
  41. ID: byte(ChunkChannel),
  42. Priority: 1,
  43. SendQueueCapacity: 4,
  44. RecvMessageCapacity: chunkMsgSize,
  45. },
  46. },
  47. }
  48. )
  49. const (
  50. // SnapshotChannel exchanges snapshot metadata
  51. SnapshotChannel = p2p.ChannelID(0x60)
  52. // ChunkChannel exchanges chunk contents
  53. ChunkChannel = p2p.ChannelID(0x61)
  54. // recentSnapshots is the number of recent snapshots to send and receive per peer.
  55. recentSnapshots = 10
  56. // snapshotMsgSize is the maximum size of a snapshotResponseMessage
  57. snapshotMsgSize = int(4e6)
  58. // chunkMsgSize is the maximum size of a chunkResponseMessage
  59. chunkMsgSize = int(16e6)
  60. )
  61. // Reactor handles state sync, both restoring snapshots for the local node and serving snapshots
  62. // for other nodes.
  63. type Reactor struct {
  64. service.BaseService
  65. conn proxy.AppConnSnapshot
  66. connQuery proxy.AppConnQuery
  67. tempDir string
  68. snapshotCh *p2p.Channel
  69. chunkCh *p2p.Channel
  70. peerUpdates *p2p.PeerUpdatesCh
  71. closeCh chan struct{}
  72. // This will only be set when a state sync is in progress. It is used to feed
  73. // received snapshots and chunks into the sync.
  74. mtx tmsync.RWMutex
  75. syncer *syncer
  76. }
  77. // NewReactor returns a reference to a new state sync reactor, which implements
  78. // the service.Service interface. It accepts a logger, connections for snapshots
  79. // and querying, references to p2p Channels and a channel to listen for peer
  80. // updates on. Note, the reactor will close all p2p Channels when stopping.
  81. func NewReactor(
  82. logger log.Logger,
  83. conn proxy.AppConnSnapshot,
  84. connQuery proxy.AppConnQuery,
  85. snapshotCh, chunkCh *p2p.Channel,
  86. peerUpdates *p2p.PeerUpdatesCh,
  87. tempDir string,
  88. ) *Reactor {
  89. r := &Reactor{
  90. conn: conn,
  91. connQuery: connQuery,
  92. snapshotCh: snapshotCh,
  93. chunkCh: chunkCh,
  94. peerUpdates: peerUpdates,
  95. closeCh: make(chan struct{}),
  96. tempDir: tempDir,
  97. }
  98. r.BaseService = *service.NewBaseService(logger, "StateSync", r)
  99. return r
  100. }
  101. // OnStart starts separate go routines for each p2p Channel and listens for
  102. // envelopes on each. In addition, it also listens for peer updates and handles
  103. // messages on that p2p channel accordingly. The caller must be sure to execute
  104. // OnStop to ensure the outbound p2p Channels are closed. No error is returned.
  105. func (r *Reactor) OnStart() error {
  106. // Listen for envelopes on the snapshot p2p Channel in a separate go-routine
  107. // as to not block or cause IO contention with the chunk p2p Channel. Note,
  108. // we do not launch a go-routine to handle individual envelopes as to not
  109. // have to deal with bounding workers or pools.
  110. go r.processSnapshotCh()
  111. // Listen for envelopes on the chunk p2p Channel in a separate go-routine
  112. // as to not block or cause IO contention with the snapshot p2p Channel. Note,
  113. // we do not launch a go-routine to handle individual envelopes as to not
  114. // have to deal with bounding workers or pools.
  115. go r.processChunkCh()
  116. go r.processPeerUpdates()
  117. return nil
  118. }
  119. // OnStop stops the reactor by signaling to all spawned goroutines to exit and
  120. // blocking until they all exit.
  121. func (r *Reactor) OnStop() {
  122. // Close closeCh to signal to all spawned goroutines to gracefully exit. All
  123. // p2p Channels should execute Close().
  124. close(r.closeCh)
  125. // Wait for all p2p Channels to be closed before returning. This ensures we
  126. // can easily reason about synchronization of all p2p Channels and ensure no
  127. // panics will occur.
  128. <-r.snapshotCh.Done()
  129. <-r.chunkCh.Done()
  130. <-r.peerUpdates.Done()
  131. }
  132. // handleSnapshotMessage handles enevelopes sent from peers on the
  133. // SnapshotChannel. It returns an error only if the Envelope.Message is unknown
  134. // for this channel. This should never be called outside of handleMessage.
  135. func (r *Reactor) handleSnapshotMessage(envelope p2p.Envelope) error {
  136. switch msg := envelope.Message.(type) {
  137. case *ssproto.SnapshotsRequest:
  138. snapshots, err := r.recentSnapshots(recentSnapshots)
  139. if err != nil {
  140. r.Logger.Error("failed to fetch snapshots", "err", err)
  141. return nil
  142. }
  143. for _, snapshot := range snapshots {
  144. r.Logger.Debug(
  145. "advertising snapshot",
  146. "height", snapshot.Height,
  147. "format", snapshot.Format,
  148. "peer", envelope.From.String(),
  149. )
  150. r.snapshotCh.Out() <- p2p.Envelope{
  151. To: envelope.From,
  152. Message: &ssproto.SnapshotsResponse{
  153. Height: snapshot.Height,
  154. Format: snapshot.Format,
  155. Chunks: snapshot.Chunks,
  156. Hash: snapshot.Hash,
  157. Metadata: snapshot.Metadata,
  158. },
  159. }
  160. }
  161. case *ssproto.SnapshotsResponse:
  162. r.mtx.RLock()
  163. defer r.mtx.RUnlock()
  164. if r.syncer == nil {
  165. r.Logger.Debug("received unexpected snapshot; no state sync in progress")
  166. return nil
  167. }
  168. r.Logger.Debug(
  169. "received snapshot",
  170. "height", msg.Height,
  171. "format", msg.Format,
  172. "peer", envelope.From.String(),
  173. )
  174. _, err := r.syncer.AddSnapshot(envelope.From, &snapshot{
  175. Height: msg.Height,
  176. Format: msg.Format,
  177. Chunks: msg.Chunks,
  178. Hash: msg.Hash,
  179. Metadata: msg.Metadata,
  180. })
  181. if err != nil {
  182. r.Logger.Error(
  183. "failed to add snapshot",
  184. "height", msg.Height,
  185. "format", msg.Format,
  186. "err", err,
  187. "channel", r.snapshotCh.ID,
  188. )
  189. return nil
  190. }
  191. default:
  192. r.Logger.Error("received unknown message", "msg", msg, "peer", envelope.From.String())
  193. return fmt.Errorf("received unknown message: %T", msg)
  194. }
  195. return nil
  196. }
  197. // handleChunkMessage handles enevelopes sent from peers on the ChunkChannel.
  198. // It returns an error only if the Envelope.Message is unknown for this channel.
  199. // This should never be called outside of handleMessage.
  200. func (r *Reactor) handleChunkMessage(envelope p2p.Envelope) error {
  201. switch msg := envelope.Message.(type) {
  202. case *ssproto.ChunkRequest:
  203. r.Logger.Debug(
  204. "received chunk request",
  205. "height", msg.Height,
  206. "format", msg.Format,
  207. "chunk", msg.Index,
  208. "peer", envelope.From.String(),
  209. )
  210. resp, err := r.conn.LoadSnapshotChunkSync(context.Background(), abci.RequestLoadSnapshotChunk{
  211. Height: msg.Height,
  212. Format: msg.Format,
  213. Chunk: msg.Index,
  214. })
  215. if err != nil {
  216. r.Logger.Error(
  217. "failed to load chunk",
  218. "height", msg.Height,
  219. "format", msg.Format,
  220. "chunk", msg.Index,
  221. "err", err,
  222. "peer", envelope.From.String(),
  223. )
  224. return nil
  225. }
  226. r.Logger.Debug(
  227. "sending chunk",
  228. "height", msg.Height,
  229. "format", msg.Format,
  230. "chunk", msg.Index,
  231. "peer", envelope.From.String(),
  232. )
  233. r.chunkCh.Out() <- p2p.Envelope{
  234. To: envelope.From,
  235. Message: &ssproto.ChunkResponse{
  236. Height: msg.Height,
  237. Format: msg.Format,
  238. Index: msg.Index,
  239. Chunk: resp.Chunk,
  240. Missing: resp.Chunk == nil,
  241. },
  242. }
  243. case *ssproto.ChunkResponse:
  244. r.mtx.RLock()
  245. defer r.mtx.RUnlock()
  246. if r.syncer == nil {
  247. r.Logger.Debug("received unexpected chunk; no state sync in progress", "peer", envelope.From.String())
  248. return nil
  249. }
  250. r.Logger.Debug(
  251. "received chunk; adding to sync",
  252. "height", msg.Height,
  253. "format", msg.Format,
  254. "chunk", msg.Index,
  255. "peer", envelope.From.String(),
  256. )
  257. _, err := r.syncer.AddChunk(&chunk{
  258. Height: msg.Height,
  259. Format: msg.Format,
  260. Index: msg.Index,
  261. Chunk: msg.Chunk,
  262. Sender: envelope.From,
  263. })
  264. if err != nil {
  265. r.Logger.Error(
  266. "failed to add chunk",
  267. "height", msg.Height,
  268. "format", msg.Format,
  269. "chunk", msg.Index,
  270. "err", err,
  271. "peer", envelope.From.String(),
  272. )
  273. return nil
  274. }
  275. default:
  276. r.Logger.Error("received unknown message", "msg", msg, "peer", envelope.From.String())
  277. return fmt.Errorf("received unknown message: %T", msg)
  278. }
  279. return nil
  280. }
  281. // handleMessage handles an Envelope sent from a peer on a specific p2p Channel.
  282. // It will handle errors and any possible panics gracefully. A caller can handle
  283. // any error returned by sending a PeerError on the respective channel.
  284. func (r *Reactor) handleMessage(chID p2p.ChannelID, envelope p2p.Envelope) (err error) {
  285. defer func() {
  286. if e := recover(); e != nil {
  287. err = fmt.Errorf("panic in processing message: %v", e)
  288. r.Logger.Error("recovering from processing message panic", "err", err)
  289. }
  290. }()
  291. switch chID {
  292. case SnapshotChannel:
  293. err = r.handleSnapshotMessage(envelope)
  294. case ChunkChannel:
  295. err = r.handleChunkMessage(envelope)
  296. default:
  297. err = fmt.Errorf("unknown channel ID (%d) for envelope (%v)", chID, envelope)
  298. }
  299. return err
  300. }
  301. // processSnapshotCh initiates a blocking process where we listen for and handle
  302. // envelopes on the SnapshotChannel. Any error encountered during message
  303. // execution will result in a PeerError being sent on the SnapshotChannel. When
  304. // the reactor is stopped, we will catch the singal and close the p2p Channel
  305. // gracefully.
  306. func (r *Reactor) processSnapshotCh() {
  307. defer r.snapshotCh.Close()
  308. for {
  309. select {
  310. case envelope := <-r.snapshotCh.In():
  311. if err := r.handleMessage(r.snapshotCh.ID(), envelope); err != nil {
  312. r.snapshotCh.Error() <- p2p.PeerError{
  313. PeerID: envelope.From,
  314. Err: err,
  315. Severity: p2p.PeerErrorSeverityLow,
  316. }
  317. }
  318. case <-r.closeCh:
  319. r.Logger.Debug("stopped listening on snapshot channel; closing...")
  320. return
  321. }
  322. }
  323. }
  324. // processChunkCh initiates a blocking process where we listen for and handle
  325. // envelopes on the ChunkChannel. Any error encountered during message
  326. // execution will result in a PeerError being sent on the ChunkChannel. When
  327. // the reactor is stopped, we will catch the singal and close the p2p Channel
  328. // gracefully.
  329. func (r *Reactor) processChunkCh() {
  330. defer r.chunkCh.Close()
  331. for {
  332. select {
  333. case envelope := <-r.chunkCh.In():
  334. if err := r.handleMessage(r.chunkCh.ID(), envelope); err != nil {
  335. r.chunkCh.Error() <- p2p.PeerError{
  336. PeerID: envelope.From,
  337. Err: err,
  338. Severity: p2p.PeerErrorSeverityLow,
  339. }
  340. }
  341. case <-r.closeCh:
  342. r.Logger.Debug("stopped listening on chunk channel; closing...")
  343. return
  344. }
  345. }
  346. }
  347. // processPeerUpdate processes a PeerUpdate, returning an error upon failing to
  348. // handle the PeerUpdate or if a panic is recovered.
  349. func (r *Reactor) processPeerUpdate(peerUpdate p2p.PeerUpdate) (err error) {
  350. defer func() {
  351. if e := recover(); e != nil {
  352. err = fmt.Errorf("panic in processing peer update: %v", e)
  353. r.Logger.Error("recovering from processing peer update panic", "err", err)
  354. }
  355. }()
  356. r.Logger.Debug("received peer update", "peer", peerUpdate.PeerID.String(), "status", peerUpdate.Status)
  357. r.mtx.RLock()
  358. defer r.mtx.RUnlock()
  359. if r.syncer != nil {
  360. switch peerUpdate.Status {
  361. case p2p.PeerStatusNew, p2p.PeerStatusUp:
  362. r.syncer.AddPeer(peerUpdate.PeerID)
  363. case p2p.PeerStatusDown, p2p.PeerStatusRemoved, p2p.PeerStatusBanned:
  364. r.syncer.RemovePeer(peerUpdate.PeerID)
  365. }
  366. }
  367. return err
  368. }
  369. // processPeerUpdates initiates a blocking process where we listen for and handle
  370. // PeerUpdate messages. When the reactor is stopped, we will catch the singal and
  371. // close the p2p PeerUpdatesCh gracefully.
  372. func (r *Reactor) processPeerUpdates() {
  373. defer r.peerUpdates.Close()
  374. for {
  375. select {
  376. case peerUpdate := <-r.peerUpdates.Updates():
  377. _ = r.processPeerUpdate(peerUpdate)
  378. case <-r.closeCh:
  379. r.Logger.Debug("stopped listening on peer updates channel; closing...")
  380. return
  381. }
  382. }
  383. }
  384. // recentSnapshots fetches the n most recent snapshots from the app
  385. func (r *Reactor) recentSnapshots(n uint32) ([]*snapshot, error) {
  386. resp, err := r.conn.ListSnapshotsSync(context.Background(), abci.RequestListSnapshots{})
  387. if err != nil {
  388. return nil, err
  389. }
  390. sort.Slice(resp.Snapshots, func(i, j int) bool {
  391. a := resp.Snapshots[i]
  392. b := resp.Snapshots[j]
  393. switch {
  394. case a.Height > b.Height:
  395. return true
  396. case a.Height == b.Height && a.Format > b.Format:
  397. return true
  398. default:
  399. return false
  400. }
  401. })
  402. snapshots := make([]*snapshot, 0, n)
  403. for i, s := range resp.Snapshots {
  404. if i >= recentSnapshots {
  405. break
  406. }
  407. snapshots = append(snapshots, &snapshot{
  408. Height: s.Height,
  409. Format: s.Format,
  410. Chunks: s.Chunks,
  411. Hash: s.Hash,
  412. Metadata: s.Metadata,
  413. })
  414. }
  415. return snapshots, nil
  416. }
  417. // Sync runs a state sync, returning the new state and last commit at the snapshot height.
  418. // The caller must store the state and commit in the state database and block store.
  419. func (r *Reactor) Sync(stateProvider StateProvider, discoveryTime time.Duration) (sm.State, *types.Commit, error) {
  420. r.mtx.Lock()
  421. if r.syncer != nil {
  422. r.mtx.Unlock()
  423. return sm.State{}, nil, errors.New("a state sync is already in progress")
  424. }
  425. r.syncer = newSyncer(r.Logger, r.conn, r.connQuery, stateProvider, r.snapshotCh.Out(), r.chunkCh.Out(), r.tempDir)
  426. r.mtx.Unlock()
  427. // request snapshots from all currently connected peers
  428. r.Logger.Debug("requesting snapshots from known peers")
  429. r.snapshotCh.Out() <- p2p.Envelope{
  430. Broadcast: true,
  431. Message: &ssproto.SnapshotsRequest{},
  432. }
  433. state, commit, err := r.syncer.SyncAny(discoveryTime)
  434. r.mtx.Lock()
  435. r.syncer = nil
  436. r.mtx.Unlock()
  437. return state, commit, err
  438. }