You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

262 lines
7.0 KiB

  1. package statesync
  2. import (
  3. "errors"
  4. "sort"
  5. abci "github.com/tendermint/tendermint/abci/types"
  6. tmsync "github.com/tendermint/tendermint/libs/sync"
  7. "github.com/tendermint/tendermint/p2p"
  8. ssproto "github.com/tendermint/tendermint/proto/tendermint/statesync"
  9. "github.com/tendermint/tendermint/proxy"
  10. sm "github.com/tendermint/tendermint/state"
  11. "github.com/tendermint/tendermint/types"
  12. )
  13. const (
  14. // SnapshotChannel exchanges snapshot metadata
  15. SnapshotChannel = byte(0x60)
  16. // ChunkChannel exchanges chunk contents
  17. ChunkChannel = byte(0x61)
  18. // recentSnapshots is the number of recent snapshots to send and receive per peer.
  19. recentSnapshots = 10
  20. )
  21. // Reactor handles state sync, both restoring snapshots for the local node and serving snapshots
  22. // for other nodes.
  23. type Reactor struct {
  24. p2p.BaseReactor
  25. conn proxy.AppConnSnapshot
  26. connQuery proxy.AppConnQuery
  27. tempDir string
  28. // This will only be set when a state sync is in progress. It is used to feed received
  29. // snapshots and chunks into the sync.
  30. mtx tmsync.RWMutex
  31. syncer *syncer
  32. }
  33. // NewReactor creates a new state sync reactor.
  34. func NewReactor(conn proxy.AppConnSnapshot, connQuery proxy.AppConnQuery, tempDir string) *Reactor {
  35. r := &Reactor{
  36. conn: conn,
  37. connQuery: connQuery,
  38. }
  39. r.BaseReactor = *p2p.NewBaseReactor("StateSync", r)
  40. return r
  41. }
  42. // GetChannels implements p2p.Reactor.
  43. func (r *Reactor) GetChannels() []*p2p.ChannelDescriptor {
  44. return []*p2p.ChannelDescriptor{
  45. {
  46. ID: SnapshotChannel,
  47. Priority: 3,
  48. SendQueueCapacity: 10,
  49. RecvMessageCapacity: snapshotMsgSize,
  50. },
  51. {
  52. ID: ChunkChannel,
  53. Priority: 1,
  54. SendQueueCapacity: 4,
  55. RecvMessageCapacity: chunkMsgSize,
  56. },
  57. }
  58. }
  59. // OnStart implements p2p.Reactor.
  60. func (r *Reactor) OnStart() error {
  61. return nil
  62. }
  63. // AddPeer implements p2p.Reactor.
  64. func (r *Reactor) AddPeer(peer p2p.Peer) {
  65. r.mtx.RLock()
  66. defer r.mtx.RUnlock()
  67. if r.syncer != nil {
  68. r.syncer.AddPeer(peer)
  69. }
  70. }
  71. // RemovePeer implements p2p.Reactor.
  72. func (r *Reactor) RemovePeer(peer p2p.Peer, reason interface{}) {
  73. r.mtx.RLock()
  74. defer r.mtx.RUnlock()
  75. if r.syncer != nil {
  76. r.syncer.RemovePeer(peer)
  77. }
  78. }
  79. // Receive implements p2p.Reactor.
  80. func (r *Reactor) Receive(chID byte, src p2p.Peer, msgBytes []byte) {
  81. if !r.IsRunning() {
  82. return
  83. }
  84. msg, err := decodeMsg(msgBytes)
  85. if err != nil {
  86. r.Logger.Error("Error decoding message", "src", src, "chId", chID, "msg", msg, "err", err, "bytes", msgBytes)
  87. r.Switch.StopPeerForError(src, err)
  88. return
  89. }
  90. err = validateMsg(msg)
  91. if err != nil {
  92. r.Logger.Error("Invalid message", "peer", src, "msg", msg, "err", err)
  93. r.Switch.StopPeerForError(src, err)
  94. return
  95. }
  96. switch chID {
  97. case SnapshotChannel:
  98. switch msg := msg.(type) {
  99. case *ssproto.SnapshotsRequest:
  100. snapshots, err := r.recentSnapshots(recentSnapshots)
  101. if err != nil {
  102. r.Logger.Error("Failed to fetch snapshots", "err", err)
  103. return
  104. }
  105. for _, snapshot := range snapshots {
  106. r.Logger.Debug("Advertising snapshot", "height", snapshot.Height,
  107. "format", snapshot.Format, "peer", src.ID())
  108. src.Send(chID, mustEncodeMsg(&ssproto.SnapshotsResponse{
  109. Height: snapshot.Height,
  110. Format: snapshot.Format,
  111. Chunks: snapshot.Chunks,
  112. Hash: snapshot.Hash,
  113. Metadata: snapshot.Metadata,
  114. }))
  115. }
  116. case *ssproto.SnapshotsResponse:
  117. r.mtx.RLock()
  118. defer r.mtx.RUnlock()
  119. if r.syncer == nil {
  120. r.Logger.Debug("Received unexpected snapshot, no state sync in progress")
  121. return
  122. }
  123. r.Logger.Debug("Received snapshot", "height", msg.Height, "format", msg.Format, "peer", src.ID())
  124. _, err := r.syncer.AddSnapshot(src, &snapshot{
  125. Height: msg.Height,
  126. Format: msg.Format,
  127. Chunks: msg.Chunks,
  128. Hash: msg.Hash,
  129. Metadata: msg.Metadata,
  130. })
  131. if err != nil {
  132. r.Logger.Error("Failed to add snapshot", "height", msg.Height, "format", msg.Format,
  133. "peer", src.ID(), "err", err)
  134. return
  135. }
  136. default:
  137. r.Logger.Error("Received unknown message %T", msg)
  138. }
  139. case ChunkChannel:
  140. switch msg := msg.(type) {
  141. case *ssproto.ChunkRequest:
  142. r.Logger.Debug("Received chunk request", "height", msg.Height, "format", msg.Format,
  143. "chunk", msg.Index, "peer", src.ID())
  144. resp, err := r.conn.LoadSnapshotChunkSync(abci.RequestLoadSnapshotChunk{
  145. Height: msg.Height,
  146. Format: msg.Format,
  147. Chunk: msg.Index,
  148. })
  149. if err != nil {
  150. r.Logger.Error("Failed to load chunk", "height", msg.Height, "format", msg.Format,
  151. "chunk", msg.Index, "err", err)
  152. return
  153. }
  154. r.Logger.Debug("Sending chunk", "height", msg.Height, "format", msg.Format,
  155. "chunk", msg.Index, "peer", src.ID())
  156. src.Send(ChunkChannel, mustEncodeMsg(&ssproto.ChunkResponse{
  157. Height: msg.Height,
  158. Format: msg.Format,
  159. Index: msg.Index,
  160. Chunk: resp.Chunk,
  161. Missing: resp.Chunk == nil,
  162. }))
  163. case *ssproto.ChunkResponse:
  164. r.mtx.RLock()
  165. defer r.mtx.RUnlock()
  166. if r.syncer == nil {
  167. r.Logger.Debug("Received unexpected chunk, no state sync in progress", "peer", src.ID())
  168. return
  169. }
  170. r.Logger.Debug("Received chunk, adding to sync", "height", msg.Height, "format", msg.Format,
  171. "chunk", msg.Index, "peer", src.ID())
  172. _, err := r.syncer.AddChunk(&chunk{
  173. Height: msg.Height,
  174. Format: msg.Format,
  175. Index: msg.Index,
  176. Chunk: msg.Chunk,
  177. Sender: src.ID(),
  178. })
  179. if err != nil {
  180. r.Logger.Error("Failed to add chunk", "height", msg.Height, "format", msg.Format,
  181. "chunk", msg.Index, "err", err)
  182. return
  183. }
  184. default:
  185. r.Logger.Error("Received unknown message %T", msg)
  186. }
  187. default:
  188. r.Logger.Error("Received message on invalid channel %x", chID)
  189. }
  190. }
  191. // recentSnapshots fetches the n most recent snapshots from the app
  192. func (r *Reactor) recentSnapshots(n uint32) ([]*snapshot, error) {
  193. resp, err := r.conn.ListSnapshotsSync(abci.RequestListSnapshots{})
  194. if err != nil {
  195. return nil, err
  196. }
  197. sort.Slice(resp.Snapshots, func(i, j int) bool {
  198. a := resp.Snapshots[i]
  199. b := resp.Snapshots[j]
  200. switch {
  201. case a.Height > b.Height:
  202. return true
  203. case a.Height == b.Height && a.Format > b.Format:
  204. return true
  205. default:
  206. return false
  207. }
  208. })
  209. snapshots := make([]*snapshot, 0, n)
  210. for i, s := range resp.Snapshots {
  211. if i >= recentSnapshots {
  212. break
  213. }
  214. snapshots = append(snapshots, &snapshot{
  215. Height: s.Height,
  216. Format: s.Format,
  217. Chunks: s.Chunks,
  218. Hash: s.Hash,
  219. Metadata: s.Metadata,
  220. })
  221. }
  222. return snapshots, nil
  223. }
  224. // Sync runs a state sync, returning the new state and last commit at the snapshot height.
  225. // The caller must store the state and commit in the state database and block store.
  226. func (r *Reactor) Sync(stateProvider StateProvider) (sm.State, *types.Commit, error) {
  227. r.mtx.Lock()
  228. if r.syncer != nil {
  229. r.mtx.Unlock()
  230. return sm.State{}, nil, errors.New("a state sync is already in progress")
  231. }
  232. r.syncer = newSyncer(r.Logger, r.conn, r.connQuery, stateProvider, r.tempDir)
  233. r.mtx.Unlock()
  234. state, commit, err := r.syncer.SyncAny(defaultDiscoveryTime)
  235. r.mtx.Lock()
  236. r.syncer = nil
  237. r.mtx.Unlock()
  238. return state, commit, err
  239. }