You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

490 lines
14 KiB

  1. package statesync
  2. import (
  3. "context"
  4. "errors"
  5. "fmt"
  6. "sort"
  7. "time"
  8. abci "github.com/tendermint/tendermint/abci/types"
  9. "github.com/tendermint/tendermint/libs/log"
  10. "github.com/tendermint/tendermint/libs/service"
  11. tmsync "github.com/tendermint/tendermint/libs/sync"
  12. "github.com/tendermint/tendermint/p2p"
  13. ssproto "github.com/tendermint/tendermint/proto/tendermint/statesync"
  14. "github.com/tendermint/tendermint/proxy"
  15. sm "github.com/tendermint/tendermint/state"
  16. "github.com/tendermint/tendermint/types"
  17. )
  18. var (
  19. _ service.Service = (*Reactor)(nil)
  20. _ p2p.Wrapper = (*ssproto.Message)(nil)
  21. // ChannelShims contains a map of ChannelDescriptorShim objects, where each
  22. // object wraps a reference to a legacy p2p ChannelDescriptor and the corresponding
  23. // p2p proto.Message the new p2p Channel is responsible for handling.
  24. //
  25. //
  26. // TODO: Remove once p2p refactor is complete.
  27. // ref: https://github.com/tendermint/tendermint/issues/5670
  28. ChannelShims = map[p2p.ChannelID]*p2p.ChannelDescriptorShim{
  29. SnapshotChannel: {
  30. MsgType: new(ssproto.Message),
  31. Descriptor: &p2p.ChannelDescriptor{
  32. ID: byte(SnapshotChannel),
  33. Priority: 5,
  34. SendQueueCapacity: 10,
  35. RecvMessageCapacity: snapshotMsgSize,
  36. },
  37. },
  38. ChunkChannel: {
  39. MsgType: new(ssproto.Message),
  40. Descriptor: &p2p.ChannelDescriptor{
  41. ID: byte(ChunkChannel),
  42. Priority: 1,
  43. SendQueueCapacity: 4,
  44. RecvMessageCapacity: chunkMsgSize,
  45. },
  46. },
  47. }
  48. )
  49. const (
  50. // SnapshotChannel exchanges snapshot metadata
  51. SnapshotChannel = p2p.ChannelID(0x60)
  52. // ChunkChannel exchanges chunk contents
  53. ChunkChannel = p2p.ChannelID(0x61)
  54. // recentSnapshots is the number of recent snapshots to send and receive per peer.
  55. recentSnapshots = 10
  56. // snapshotMsgSize is the maximum size of a snapshotResponseMessage
  57. snapshotMsgSize = int(4e6)
  58. // chunkMsgSize is the maximum size of a chunkResponseMessage
  59. chunkMsgSize = int(16e6)
  60. )
  61. // Reactor handles state sync, both restoring snapshots for the local node and
  62. // serving snapshots for other nodes.
  63. type Reactor struct {
  64. service.BaseService
  65. conn proxy.AppConnSnapshot
  66. connQuery proxy.AppConnQuery
  67. tempDir string
  68. snapshotCh *p2p.Channel
  69. chunkCh *p2p.Channel
  70. peerUpdates *p2p.PeerUpdates
  71. closeCh chan struct{}
  72. // This will only be set when a state sync is in progress. It is used to feed
  73. // received snapshots and chunks into the sync.
  74. mtx tmsync.RWMutex
  75. syncer *syncer
  76. }
  77. // NewReactor returns a reference to a new state sync reactor, which implements
  78. // the service.Service interface. It accepts a logger, connections for snapshots
  79. // and querying, references to p2p Channels and a channel to listen for peer
  80. // updates on. Note, the reactor will close all p2p Channels when stopping.
  81. func NewReactor(
  82. logger log.Logger,
  83. conn proxy.AppConnSnapshot,
  84. connQuery proxy.AppConnQuery,
  85. snapshotCh, chunkCh *p2p.Channel,
  86. peerUpdates *p2p.PeerUpdates,
  87. tempDir string,
  88. ) *Reactor {
  89. r := &Reactor{
  90. conn: conn,
  91. connQuery: connQuery,
  92. snapshotCh: snapshotCh,
  93. chunkCh: chunkCh,
  94. peerUpdates: peerUpdates,
  95. closeCh: make(chan struct{}),
  96. tempDir: tempDir,
  97. }
  98. r.BaseService = *service.NewBaseService(logger, "StateSync", r)
  99. return r
  100. }
  101. // OnStart starts separate go routines for each p2p Channel and listens for
  102. // envelopes on each. In addition, it also listens for peer updates and handles
  103. // messages on that p2p channel accordingly. The caller must be sure to execute
  104. // OnStop to ensure the outbound p2p Channels are closed. No error is returned.
  105. func (r *Reactor) OnStart() error {
  106. // Listen for envelopes on the snapshot p2p Channel in a separate go-routine
  107. // as to not block or cause IO contention with the chunk p2p Channel. Note,
  108. // we do not launch a go-routine to handle individual envelopes as to not
  109. // have to deal with bounding workers or pools.
  110. go r.processSnapshotCh()
  111. // Listen for envelopes on the chunk p2p Channel in a separate go-routine
  112. // as to not block or cause IO contention with the snapshot p2p Channel. Note,
  113. // we do not launch a go-routine to handle individual envelopes as to not
  114. // have to deal with bounding workers or pools.
  115. go r.processChunkCh()
  116. go r.processPeerUpdates()
  117. return nil
  118. }
  119. // OnStop stops the reactor by signaling to all spawned goroutines to exit and
  120. // blocking until they all exit.
  121. func (r *Reactor) OnStop() {
  122. // Close closeCh to signal to all spawned goroutines to gracefully exit. All
  123. // p2p Channels should execute Close().
  124. close(r.closeCh)
  125. // Wait for all p2p Channels to be closed before returning. This ensures we
  126. // can easily reason about synchronization of all p2p Channels and ensure no
  127. // panics will occur.
  128. <-r.snapshotCh.Done()
  129. <-r.chunkCh.Done()
  130. <-r.peerUpdates.Done()
  131. }
  132. // handleSnapshotMessage handles envelopes sent from peers on the
  133. // SnapshotChannel. It returns an error only if the Envelope.Message is unknown
  134. // for this channel. This should never be called outside of handleMessage.
  135. func (r *Reactor) handleSnapshotMessage(envelope p2p.Envelope) error {
  136. logger := r.Logger.With("peer", envelope.From)
  137. switch msg := envelope.Message.(type) {
  138. case *ssproto.SnapshotsRequest:
  139. snapshots, err := r.recentSnapshots(recentSnapshots)
  140. if err != nil {
  141. logger.Error("failed to fetch snapshots", "err", err)
  142. return nil
  143. }
  144. for _, snapshot := range snapshots {
  145. logger.Debug(
  146. "advertising snapshot",
  147. "height", snapshot.Height,
  148. "format", snapshot.Format,
  149. )
  150. r.snapshotCh.Out <- p2p.Envelope{
  151. To: envelope.From,
  152. Message: &ssproto.SnapshotsResponse{
  153. Height: snapshot.Height,
  154. Format: snapshot.Format,
  155. Chunks: snapshot.Chunks,
  156. Hash: snapshot.Hash,
  157. Metadata: snapshot.Metadata,
  158. },
  159. }
  160. }
  161. case *ssproto.SnapshotsResponse:
  162. r.mtx.RLock()
  163. defer r.mtx.RUnlock()
  164. if r.syncer == nil {
  165. logger.Debug("received unexpected snapshot; no state sync in progress")
  166. return nil
  167. }
  168. logger.Debug("received snapshot", "height", msg.Height, "format", msg.Format)
  169. _, err := r.syncer.AddSnapshot(envelope.From, &snapshot{
  170. Height: msg.Height,
  171. Format: msg.Format,
  172. Chunks: msg.Chunks,
  173. Hash: msg.Hash,
  174. Metadata: msg.Metadata,
  175. })
  176. if err != nil {
  177. logger.Error(
  178. "failed to add snapshot",
  179. "height", msg.Height,
  180. "format", msg.Format,
  181. "err", err,
  182. "channel", r.snapshotCh.ID,
  183. )
  184. return nil
  185. }
  186. default:
  187. return fmt.Errorf("received unknown message: %T", msg)
  188. }
  189. return nil
  190. }
  191. // handleChunkMessage handles envelopes sent from peers on the ChunkChannel.
  192. // It returns an error only if the Envelope.Message is unknown for this channel.
  193. // This should never be called outside of handleMessage.
  194. func (r *Reactor) handleChunkMessage(envelope p2p.Envelope) error {
  195. switch msg := envelope.Message.(type) {
  196. case *ssproto.ChunkRequest:
  197. r.Logger.Debug(
  198. "received chunk request",
  199. "height", msg.Height,
  200. "format", msg.Format,
  201. "chunk", msg.Index,
  202. "peer", envelope.From,
  203. )
  204. resp, err := r.conn.LoadSnapshotChunkSync(context.Background(), abci.RequestLoadSnapshotChunk{
  205. Height: msg.Height,
  206. Format: msg.Format,
  207. Chunk: msg.Index,
  208. })
  209. if err != nil {
  210. r.Logger.Error(
  211. "failed to load chunk",
  212. "height", msg.Height,
  213. "format", msg.Format,
  214. "chunk", msg.Index,
  215. "err", err,
  216. "peer", envelope.From,
  217. )
  218. return nil
  219. }
  220. r.Logger.Debug(
  221. "sending chunk",
  222. "height", msg.Height,
  223. "format", msg.Format,
  224. "chunk", msg.Index,
  225. "peer", envelope.From,
  226. )
  227. r.chunkCh.Out <- p2p.Envelope{
  228. To: envelope.From,
  229. Message: &ssproto.ChunkResponse{
  230. Height: msg.Height,
  231. Format: msg.Format,
  232. Index: msg.Index,
  233. Chunk: resp.Chunk,
  234. Missing: resp.Chunk == nil,
  235. },
  236. }
  237. case *ssproto.ChunkResponse:
  238. r.mtx.RLock()
  239. defer r.mtx.RUnlock()
  240. if r.syncer == nil {
  241. r.Logger.Debug("received unexpected chunk; no state sync in progress", "peer", envelope.From)
  242. return nil
  243. }
  244. r.Logger.Debug(
  245. "received chunk; adding to sync",
  246. "height", msg.Height,
  247. "format", msg.Format,
  248. "chunk", msg.Index,
  249. "peer", envelope.From,
  250. )
  251. _, err := r.syncer.AddChunk(&chunk{
  252. Height: msg.Height,
  253. Format: msg.Format,
  254. Index: msg.Index,
  255. Chunk: msg.Chunk,
  256. Sender: envelope.From,
  257. })
  258. if err != nil {
  259. r.Logger.Error(
  260. "failed to add chunk",
  261. "height", msg.Height,
  262. "format", msg.Format,
  263. "chunk", msg.Index,
  264. "err", err,
  265. "peer", envelope.From,
  266. )
  267. return nil
  268. }
  269. default:
  270. return fmt.Errorf("received unknown message: %T", msg)
  271. }
  272. return nil
  273. }
  274. // handleMessage handles an Envelope sent from a peer on a specific p2p Channel.
  275. // It will handle errors and any possible panics gracefully. A caller can handle
  276. // any error returned by sending a PeerError on the respective channel.
  277. func (r *Reactor) handleMessage(chID p2p.ChannelID, envelope p2p.Envelope) (err error) {
  278. defer func() {
  279. if e := recover(); e != nil {
  280. err = fmt.Errorf("panic in processing message: %v", e)
  281. }
  282. }()
  283. r.Logger.Debug("received message", "message", envelope.Message, "peer", envelope.From)
  284. switch chID {
  285. case SnapshotChannel:
  286. err = r.handleSnapshotMessage(envelope)
  287. case ChunkChannel:
  288. err = r.handleChunkMessage(envelope)
  289. default:
  290. err = fmt.Errorf("unknown channel ID (%d) for envelope (%v)", chID, envelope)
  291. }
  292. return err
  293. }
  294. // processSnapshotCh initiates a blocking process where we listen for and handle
  295. // envelopes on the SnapshotChannel. Any error encountered during message
  296. // execution will result in a PeerError being sent on the SnapshotChannel. When
  297. // the reactor is stopped, we will catch the signal and close the p2p Channel
  298. // gracefully.
  299. func (r *Reactor) processSnapshotCh() {
  300. defer r.snapshotCh.Close()
  301. for {
  302. select {
  303. case envelope := <-r.snapshotCh.In:
  304. if err := r.handleMessage(r.snapshotCh.ID, envelope); err != nil {
  305. r.Logger.Error("failed to process message", "ch_id", r.snapshotCh.ID, "envelope", envelope, "err", err)
  306. r.snapshotCh.Error <- p2p.PeerError{
  307. NodeID: envelope.From,
  308. Err: err,
  309. }
  310. }
  311. case <-r.closeCh:
  312. r.Logger.Debug("stopped listening on snapshot channel; closing...")
  313. return
  314. }
  315. }
  316. }
  317. // processChunkCh initiates a blocking process where we listen for and handle
  318. // envelopes on the ChunkChannel. Any error encountered during message
  319. // execution will result in a PeerError being sent on the ChunkChannel. When
  320. // the reactor is stopped, we will catch the signal and close the p2p Channel
  321. // gracefully.
  322. func (r *Reactor) processChunkCh() {
  323. defer r.chunkCh.Close()
  324. for {
  325. select {
  326. case envelope := <-r.chunkCh.In:
  327. if err := r.handleMessage(r.chunkCh.ID, envelope); err != nil {
  328. r.Logger.Error("failed to process message", "ch_id", r.chunkCh.ID, "envelope", envelope, "err", err)
  329. r.chunkCh.Error <- p2p.PeerError{
  330. NodeID: envelope.From,
  331. Err: err,
  332. }
  333. }
  334. case <-r.closeCh:
  335. r.Logger.Debug("stopped listening on chunk channel; closing...")
  336. return
  337. }
  338. }
  339. }
  340. // processPeerUpdate processes a PeerUpdate, returning an error upon failing to
  341. // handle the PeerUpdate or if a panic is recovered.
  342. func (r *Reactor) processPeerUpdate(peerUpdate p2p.PeerUpdate) {
  343. r.Logger.Debug("received peer update", "peer", peerUpdate.NodeID, "status", peerUpdate.Status)
  344. r.mtx.RLock()
  345. defer r.mtx.RUnlock()
  346. if r.syncer != nil {
  347. switch peerUpdate.Status {
  348. case p2p.PeerStatusUp:
  349. r.syncer.AddPeer(peerUpdate.NodeID)
  350. case p2p.PeerStatusDown:
  351. r.syncer.RemovePeer(peerUpdate.NodeID)
  352. }
  353. }
  354. }
  355. // processPeerUpdates initiates a blocking process where we listen for and handle
  356. // PeerUpdate messages. When the reactor is stopped, we will catch the signal and
  357. // close the p2p PeerUpdatesCh gracefully.
  358. func (r *Reactor) processPeerUpdates() {
  359. defer r.peerUpdates.Close()
  360. for {
  361. select {
  362. case peerUpdate := <-r.peerUpdates.Updates():
  363. r.processPeerUpdate(peerUpdate)
  364. case <-r.closeCh:
  365. r.Logger.Debug("stopped listening on peer updates channel; closing...")
  366. return
  367. }
  368. }
  369. }
  370. // recentSnapshots fetches the n most recent snapshots from the app
  371. func (r *Reactor) recentSnapshots(n uint32) ([]*snapshot, error) {
  372. resp, err := r.conn.ListSnapshotsSync(context.Background(), abci.RequestListSnapshots{})
  373. if err != nil {
  374. return nil, err
  375. }
  376. sort.Slice(resp.Snapshots, func(i, j int) bool {
  377. a := resp.Snapshots[i]
  378. b := resp.Snapshots[j]
  379. switch {
  380. case a.Height > b.Height:
  381. return true
  382. case a.Height == b.Height && a.Format > b.Format:
  383. return true
  384. default:
  385. return false
  386. }
  387. })
  388. snapshots := make([]*snapshot, 0, n)
  389. for i, s := range resp.Snapshots {
  390. if i >= recentSnapshots {
  391. break
  392. }
  393. snapshots = append(snapshots, &snapshot{
  394. Height: s.Height,
  395. Format: s.Format,
  396. Chunks: s.Chunks,
  397. Hash: s.Hash,
  398. Metadata: s.Metadata,
  399. })
  400. }
  401. return snapshots, nil
  402. }
  403. // Sync runs a state sync, returning the new state and last commit at the snapshot height.
  404. // The caller must store the state and commit in the state database and block store.
  405. func (r *Reactor) Sync(stateProvider StateProvider, discoveryTime time.Duration) (sm.State, *types.Commit, error) {
  406. r.mtx.Lock()
  407. if r.syncer != nil {
  408. r.mtx.Unlock()
  409. return sm.State{}, nil, errors.New("a state sync is already in progress")
  410. }
  411. r.syncer = newSyncer(r.Logger, r.conn, r.connQuery, stateProvider, r.snapshotCh.Out, r.chunkCh.Out, r.tempDir)
  412. r.mtx.Unlock()
  413. // request snapshots from all currently connected peers
  414. r.Logger.Debug("requesting snapshots from known peers")
  415. r.snapshotCh.Out <- p2p.Envelope{
  416. Broadcast: true,
  417. Message: &ssproto.SnapshotsRequest{},
  418. }
  419. state, commit, err := r.syncer.SyncAny(discoveryTime)
  420. r.mtx.Lock()
  421. r.syncer = nil
  422. r.mtx.Unlock()
  423. return state, commit, err
  424. }