You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

261 lines
6.9 KiB

  1. package statesync
  2. import (
  3. "errors"
  4. "sort"
  5. "sync"
  6. abci "github.com/tendermint/tendermint/abci/types"
  7. "github.com/tendermint/tendermint/p2p"
  8. "github.com/tendermint/tendermint/proxy"
  9. sm "github.com/tendermint/tendermint/state"
  10. "github.com/tendermint/tendermint/types"
  11. )
  12. const (
  13. // SnapshotChannel exchanges snapshot metadata
  14. SnapshotChannel = byte(0x60)
  15. // ChunkChannel exchanges chunk contents
  16. ChunkChannel = byte(0x61)
  17. // recentSnapshots is the number of recent snapshots to send and receive per peer.
  18. recentSnapshots = 10
  19. )
  20. // Reactor handles state sync, both restoring snapshots for the local node and serving snapshots
  21. // for other nodes.
  22. type Reactor struct {
  23. p2p.BaseReactor
  24. conn proxy.AppConnSnapshot
  25. connQuery proxy.AppConnQuery
  26. tempDir string
  27. // This will only be set when a state sync is in progress. It is used to feed received
  28. // snapshots and chunks into the sync.
  29. mtx sync.RWMutex
  30. syncer *syncer
  31. }
  32. // NewReactor creates a new state sync reactor.
  33. func NewReactor(conn proxy.AppConnSnapshot, connQuery proxy.AppConnQuery, tempDir string) *Reactor {
  34. r := &Reactor{
  35. conn: conn,
  36. connQuery: connQuery,
  37. }
  38. r.BaseReactor = *p2p.NewBaseReactor("StateSync", r)
  39. return r
  40. }
  41. // GetChannels implements p2p.Reactor.
  42. func (r *Reactor) GetChannels() []*p2p.ChannelDescriptor {
  43. return []*p2p.ChannelDescriptor{
  44. {
  45. ID: SnapshotChannel,
  46. Priority: 3,
  47. SendQueueCapacity: 10,
  48. RecvMessageCapacity: snapshotMsgSize,
  49. },
  50. {
  51. ID: ChunkChannel,
  52. Priority: 1,
  53. SendQueueCapacity: 4,
  54. RecvMessageCapacity: chunkMsgSize,
  55. },
  56. }
  57. }
  58. // OnStart implements p2p.Reactor.
  59. func (r *Reactor) OnStart() error {
  60. return nil
  61. }
  62. // AddPeer implements p2p.Reactor.
  63. func (r *Reactor) AddPeer(peer p2p.Peer) {
  64. r.mtx.RLock()
  65. defer r.mtx.RUnlock()
  66. if r.syncer != nil {
  67. r.syncer.AddPeer(peer)
  68. }
  69. }
  70. // RemovePeer implements p2p.Reactor.
  71. func (r *Reactor) RemovePeer(peer p2p.Peer, reason interface{}) {
  72. r.mtx.RLock()
  73. defer r.mtx.RUnlock()
  74. if r.syncer != nil {
  75. r.syncer.RemovePeer(peer)
  76. }
  77. }
  78. // Receive implements p2p.Reactor.
  79. func (r *Reactor) Receive(chID byte, src p2p.Peer, msgBytes []byte) {
  80. if !r.IsRunning() {
  81. return
  82. }
  83. msg, err := decodeMsg(msgBytes)
  84. if err != nil {
  85. r.Logger.Error("Error decoding message", "src", src, "chId", chID, "msg", msg, "err", err, "bytes", msgBytes)
  86. r.Switch.StopPeerForError(src, err)
  87. return
  88. }
  89. err = msg.ValidateBasic()
  90. if err != nil {
  91. r.Logger.Error("Invalid message", "peer", src, "msg", msg, "err", err)
  92. r.Switch.StopPeerForError(src, err)
  93. return
  94. }
  95. switch chID {
  96. case SnapshotChannel:
  97. switch msg := msg.(type) {
  98. case *snapshotsRequestMessage:
  99. snapshots, err := r.recentSnapshots(recentSnapshots)
  100. if err != nil {
  101. r.Logger.Error("Failed to fetch snapshots", "err", err)
  102. return
  103. }
  104. for _, snapshot := range snapshots {
  105. r.Logger.Debug("Advertising snapshot", "height", snapshot.Height,
  106. "format", snapshot.Format, "peer", src.ID())
  107. src.Send(chID, cdc.MustMarshalBinaryBare(&snapshotsResponseMessage{
  108. Height: snapshot.Height,
  109. Format: snapshot.Format,
  110. Chunks: snapshot.Chunks,
  111. Hash: snapshot.Hash,
  112. Metadata: snapshot.Metadata,
  113. }))
  114. }
  115. case *snapshotsResponseMessage:
  116. r.mtx.RLock()
  117. defer r.mtx.RUnlock()
  118. if r.syncer == nil {
  119. r.Logger.Debug("Received unexpected snapshot, no state sync in progress")
  120. return
  121. }
  122. r.Logger.Debug("Received snapshot", "height", msg.Height, "format", msg.Format, "peer", src.ID())
  123. _, err := r.syncer.AddSnapshot(src, &snapshot{
  124. Height: msg.Height,
  125. Format: msg.Format,
  126. Chunks: msg.Chunks,
  127. Hash: msg.Hash,
  128. Metadata: msg.Metadata,
  129. })
  130. if err != nil {
  131. r.Logger.Error("Failed to add snapshot", "height", msg.Height, "format", msg.Format,
  132. "peer", src.ID(), "err", err)
  133. return
  134. }
  135. default:
  136. r.Logger.Error("Received unknown message %T", msg)
  137. }
  138. case ChunkChannel:
  139. switch msg := msg.(type) {
  140. case *chunkRequestMessage:
  141. r.Logger.Debug("Received chunk request", "height", msg.Height, "format", msg.Format,
  142. "chunk", msg.Index, "peer", src.ID())
  143. resp, err := r.conn.LoadSnapshotChunkSync(abci.RequestLoadSnapshotChunk{
  144. Height: msg.Height,
  145. Format: msg.Format,
  146. Chunk: msg.Index,
  147. })
  148. if err != nil {
  149. r.Logger.Error("Failed to load chunk", "height", msg.Height, "format", msg.Format,
  150. "chunk", msg.Index, "err", err)
  151. return
  152. }
  153. r.Logger.Debug("Sending chunk", "height", msg.Height, "format", msg.Format,
  154. "chunk", msg.Index, "peer", src.ID())
  155. src.Send(ChunkChannel, cdc.MustMarshalBinaryBare(&chunkResponseMessage{
  156. Height: msg.Height,
  157. Format: msg.Format,
  158. Index: msg.Index,
  159. Chunk: resp.Chunk,
  160. Missing: resp.Chunk == nil,
  161. }))
  162. case *chunkResponseMessage:
  163. r.mtx.RLock()
  164. defer r.mtx.RUnlock()
  165. if r.syncer == nil {
  166. r.Logger.Debug("Received unexpected chunk, no state sync in progress", "peer", src.ID())
  167. return
  168. }
  169. r.Logger.Debug("Received chunk, adding to sync", "height", msg.Height, "format", msg.Format,
  170. "chunk", msg.Index, "peer", src.ID())
  171. _, err := r.syncer.AddChunk(&chunk{
  172. Height: msg.Height,
  173. Format: msg.Format,
  174. Index: msg.Index,
  175. Chunk: msg.Chunk,
  176. Sender: src.ID(),
  177. })
  178. if err != nil {
  179. r.Logger.Error("Failed to add chunk", "height", msg.Height, "format", msg.Format,
  180. "chunk", msg.Index, "err", err)
  181. return
  182. }
  183. default:
  184. r.Logger.Error("Received unknown message %T", msg)
  185. }
  186. default:
  187. r.Logger.Error("Received message on invalid channel %x", chID)
  188. }
  189. }
  190. // recentSnapshots fetches the n most recent snapshots from the app
  191. func (r *Reactor) recentSnapshots(n uint32) ([]*snapshot, error) {
  192. resp, err := r.conn.ListSnapshotsSync(abci.RequestListSnapshots{})
  193. if err != nil {
  194. return nil, err
  195. }
  196. sort.Slice(resp.Snapshots, func(i, j int) bool {
  197. a := resp.Snapshots[i]
  198. b := resp.Snapshots[j]
  199. switch {
  200. case a.Height > b.Height:
  201. return true
  202. case a.Height == b.Height && a.Format > b.Format:
  203. return true
  204. default:
  205. return false
  206. }
  207. })
  208. snapshots := make([]*snapshot, 0, n)
  209. for i, s := range resp.Snapshots {
  210. if i >= recentSnapshots {
  211. break
  212. }
  213. snapshots = append(snapshots, &snapshot{
  214. Height: s.Height,
  215. Format: s.Format,
  216. Chunks: s.Chunks,
  217. Hash: s.Hash,
  218. Metadata: s.Metadata,
  219. })
  220. }
  221. return snapshots, nil
  222. }
  223. // Sync runs a state sync, returning the new state and last commit at the snapshot height.
  224. // The caller must store the state and commit in the state database and block store.
  225. func (r *Reactor) Sync(stateProvider StateProvider) (sm.State, *types.Commit, error) {
  226. r.mtx.Lock()
  227. if r.syncer != nil {
  228. r.mtx.Unlock()
  229. return sm.State{}, nil, errors.New("a state sync is already in progress")
  230. }
  231. r.syncer = newSyncer(r.Logger, r.conn, r.connQuery, stateProvider, r.tempDir)
  232. r.mtx.Unlock()
  233. state, commit, err := r.syncer.SyncAny(defaultDiscoveryTime)
  234. r.mtx.Lock()
  235. r.syncer = nil
  236. r.mtx.Unlock()
  237. return state, commit, err
  238. }