You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

267 lines
7.2 KiB

  1. package statesync
  2. import (
  3. "errors"
  4. "sort"
  5. "time"
  6. abci "github.com/tendermint/tendermint/abci/types"
  7. tmsync "github.com/tendermint/tendermint/libs/sync"
  8. "github.com/tendermint/tendermint/p2p"
  9. ssproto "github.com/tendermint/tendermint/proto/tendermint/statesync"
  10. "github.com/tendermint/tendermint/proxy"
  11. sm "github.com/tendermint/tendermint/state"
  12. "github.com/tendermint/tendermint/types"
  13. )
  14. const (
  15. // SnapshotChannel exchanges snapshot metadata
  16. SnapshotChannel = byte(0x60)
  17. // ChunkChannel exchanges chunk contents
  18. ChunkChannel = byte(0x61)
  19. // recentSnapshots is the number of recent snapshots to send and receive per peer.
  20. recentSnapshots = 10
  21. )
  22. // Reactor handles state sync, both restoring snapshots for the local node and serving snapshots
  23. // for other nodes.
  24. type Reactor struct {
  25. p2p.BaseReactor
  26. conn proxy.AppConnSnapshot
  27. connQuery proxy.AppConnQuery
  28. tempDir string
  29. // This will only be set when a state sync is in progress. It is used to feed received
  30. // snapshots and chunks into the sync.
  31. mtx tmsync.RWMutex
  32. syncer *syncer
  33. }
  34. // NewReactor creates a new state sync reactor.
  35. func NewReactor(conn proxy.AppConnSnapshot, connQuery proxy.AppConnQuery, tempDir string) *Reactor {
  36. r := &Reactor{
  37. conn: conn,
  38. connQuery: connQuery,
  39. }
  40. r.BaseReactor = *p2p.NewBaseReactor("StateSync", r)
  41. return r
  42. }
  43. // GetChannels implements p2p.Reactor.
  44. func (r *Reactor) GetChannels() []*p2p.ChannelDescriptor {
  45. return []*p2p.ChannelDescriptor{
  46. {
  47. ID: SnapshotChannel,
  48. Priority: 3,
  49. SendQueueCapacity: 10,
  50. RecvMessageCapacity: snapshotMsgSize,
  51. },
  52. {
  53. ID: ChunkChannel,
  54. Priority: 1,
  55. SendQueueCapacity: 4,
  56. RecvMessageCapacity: chunkMsgSize,
  57. },
  58. }
  59. }
  60. // OnStart implements p2p.Reactor.
  61. func (r *Reactor) OnStart() error {
  62. return nil
  63. }
  64. // AddPeer implements p2p.Reactor.
  65. func (r *Reactor) AddPeer(peer p2p.Peer) {
  66. r.mtx.RLock()
  67. defer r.mtx.RUnlock()
  68. if r.syncer != nil {
  69. r.syncer.AddPeer(peer)
  70. }
  71. }
  72. // RemovePeer implements p2p.Reactor.
  73. func (r *Reactor) RemovePeer(peer p2p.Peer, reason interface{}) {
  74. r.mtx.RLock()
  75. defer r.mtx.RUnlock()
  76. if r.syncer != nil {
  77. r.syncer.RemovePeer(peer)
  78. }
  79. }
  80. // Receive implements p2p.Reactor.
  81. func (r *Reactor) Receive(chID byte, src p2p.Peer, msgBytes []byte) {
  82. if !r.IsRunning() {
  83. return
  84. }
  85. msg, err := decodeMsg(msgBytes)
  86. if err != nil {
  87. r.Logger.Error("Error decoding message", "src", src, "chId", chID, "msg", msg, "err", err, "bytes", msgBytes)
  88. r.Switch.StopPeerForError(src, err)
  89. return
  90. }
  91. err = validateMsg(msg)
  92. if err != nil {
  93. r.Logger.Error("Invalid message", "peer", src, "msg", msg, "err", err)
  94. r.Switch.StopPeerForError(src, err)
  95. return
  96. }
  97. switch chID {
  98. case SnapshotChannel:
  99. switch msg := msg.(type) {
  100. case *ssproto.SnapshotsRequest:
  101. snapshots, err := r.recentSnapshots(recentSnapshots)
  102. if err != nil {
  103. r.Logger.Error("Failed to fetch snapshots", "err", err)
  104. return
  105. }
  106. for _, snapshot := range snapshots {
  107. r.Logger.Debug("Advertising snapshot", "height", snapshot.Height,
  108. "format", snapshot.Format, "peer", src.ID())
  109. src.Send(chID, mustEncodeMsg(&ssproto.SnapshotsResponse{
  110. Height: snapshot.Height,
  111. Format: snapshot.Format,
  112. Chunks: snapshot.Chunks,
  113. Hash: snapshot.Hash,
  114. Metadata: snapshot.Metadata,
  115. }))
  116. }
  117. case *ssproto.SnapshotsResponse:
  118. r.mtx.RLock()
  119. defer r.mtx.RUnlock()
  120. if r.syncer == nil {
  121. r.Logger.Debug("Received unexpected snapshot, no state sync in progress")
  122. return
  123. }
  124. r.Logger.Debug("Received snapshot", "height", msg.Height, "format", msg.Format, "peer", src.ID())
  125. _, err := r.syncer.AddSnapshot(src, &snapshot{
  126. Height: msg.Height,
  127. Format: msg.Format,
  128. Chunks: msg.Chunks,
  129. Hash: msg.Hash,
  130. Metadata: msg.Metadata,
  131. })
  132. if err != nil {
  133. r.Logger.Error("Failed to add snapshot", "height", msg.Height, "format", msg.Format,
  134. "peer", src.ID(), "err", err)
  135. return
  136. }
  137. default:
  138. r.Logger.Error("Received unknown message %T", msg)
  139. }
  140. case ChunkChannel:
  141. switch msg := msg.(type) {
  142. case *ssproto.ChunkRequest:
  143. r.Logger.Debug("Received chunk request", "height", msg.Height, "format", msg.Format,
  144. "chunk", msg.Index, "peer", src.ID())
  145. resp, err := r.conn.LoadSnapshotChunkSync(abci.RequestLoadSnapshotChunk{
  146. Height: msg.Height,
  147. Format: msg.Format,
  148. Chunk: msg.Index,
  149. })
  150. if err != nil {
  151. r.Logger.Error("Failed to load chunk", "height", msg.Height, "format", msg.Format,
  152. "chunk", msg.Index, "err", err)
  153. return
  154. }
  155. r.Logger.Debug("Sending chunk", "height", msg.Height, "format", msg.Format,
  156. "chunk", msg.Index, "peer", src.ID())
  157. src.Send(ChunkChannel, mustEncodeMsg(&ssproto.ChunkResponse{
  158. Height: msg.Height,
  159. Format: msg.Format,
  160. Index: msg.Index,
  161. Chunk: resp.Chunk,
  162. Missing: resp.Chunk == nil,
  163. }))
  164. case *ssproto.ChunkResponse:
  165. r.mtx.RLock()
  166. defer r.mtx.RUnlock()
  167. if r.syncer == nil {
  168. r.Logger.Debug("Received unexpected chunk, no state sync in progress", "peer", src.ID())
  169. return
  170. }
  171. r.Logger.Debug("Received chunk, adding to sync", "height", msg.Height, "format", msg.Format,
  172. "chunk", msg.Index, "peer", src.ID())
  173. _, err := r.syncer.AddChunk(&chunk{
  174. Height: msg.Height,
  175. Format: msg.Format,
  176. Index: msg.Index,
  177. Chunk: msg.Chunk,
  178. Sender: src.ID(),
  179. })
  180. if err != nil {
  181. r.Logger.Error("Failed to add chunk", "height", msg.Height, "format", msg.Format,
  182. "chunk", msg.Index, "err", err)
  183. return
  184. }
  185. default:
  186. r.Logger.Error("Received unknown message %T", msg)
  187. }
  188. default:
  189. r.Logger.Error("Received message on invalid channel %x", chID)
  190. }
  191. }
  192. // recentSnapshots fetches the n most recent snapshots from the app
  193. func (r *Reactor) recentSnapshots(n uint32) ([]*snapshot, error) {
  194. resp, err := r.conn.ListSnapshotsSync(abci.RequestListSnapshots{})
  195. if err != nil {
  196. return nil, err
  197. }
  198. sort.Slice(resp.Snapshots, func(i, j int) bool {
  199. a := resp.Snapshots[i]
  200. b := resp.Snapshots[j]
  201. switch {
  202. case a.Height > b.Height:
  203. return true
  204. case a.Height == b.Height && a.Format > b.Format:
  205. return true
  206. default:
  207. return false
  208. }
  209. })
  210. snapshots := make([]*snapshot, 0, n)
  211. for i, s := range resp.Snapshots {
  212. if i >= recentSnapshots {
  213. break
  214. }
  215. snapshots = append(snapshots, &snapshot{
  216. Height: s.Height,
  217. Format: s.Format,
  218. Chunks: s.Chunks,
  219. Hash: s.Hash,
  220. Metadata: s.Metadata,
  221. })
  222. }
  223. return snapshots, nil
  224. }
  225. // Sync runs a state sync, returning the new state and last commit at the snapshot height.
  226. // The caller must store the state and commit in the state database and block store.
  227. func (r *Reactor) Sync(stateProvider StateProvider, discoveryTime time.Duration) (sm.State, *types.Commit, error) {
  228. r.mtx.Lock()
  229. if r.syncer != nil {
  230. r.mtx.Unlock()
  231. return sm.State{}, nil, errors.New("a state sync is already in progress")
  232. }
  233. r.syncer = newSyncer(r.Logger, r.conn, r.connQuery, stateProvider, r.tempDir)
  234. r.mtx.Unlock()
  235. // Request snapshots from all currently connected peers
  236. r.Logger.Debug("Requesting snapshots from known peers")
  237. r.Switch.Broadcast(SnapshotChannel, mustEncodeMsg(&ssproto.SnapshotsRequest{}))
  238. state, commit, err := r.syncer.SyncAny(discoveryTime)
  239. r.mtx.Lock()
  240. r.syncer = nil
  241. r.mtx.Unlock()
  242. return state, commit, err
  243. }