You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

269 lines
7.4 KiB

  1. package statesync
  2. import (
  3. "errors"
  4. "sort"
  5. "time"
  6. abci "github.com/tendermint/tendermint/abci/types"
  7. tmsync "github.com/tendermint/tendermint/libs/sync"
  8. "github.com/tendermint/tendermint/p2p"
  9. ssproto "github.com/tendermint/tendermint/proto/tendermint/statesync"
  10. "github.com/tendermint/tendermint/proxy"
  11. sm "github.com/tendermint/tendermint/state"
  12. "github.com/tendermint/tendermint/types"
  13. )
  14. const (
  15. // SnapshotChannel exchanges snapshot metadata
  16. SnapshotChannel = byte(0x60)
  17. // ChunkChannel exchanges chunk contents
  18. ChunkChannel = byte(0x61)
  19. // recentSnapshots is the number of recent snapshots to send and receive per peer.
  20. recentSnapshots = 10
  21. )
  22. // Reactor handles state sync, both restoring snapshots for the local node and serving snapshots
  23. // for other nodes.
  24. type Reactor struct {
  25. p2p.BaseReactor
  26. conn proxy.AppConnSnapshot
  27. connQuery proxy.AppConnQuery
  28. tempDir string
  29. // This will only be set when a state sync is in progress. It is used to feed received
  30. // snapshots and chunks into the sync.
  31. mtx tmsync.RWMutex
  32. syncer *syncer
  33. }
  34. // NewReactor creates a new state sync reactor.
  35. func NewReactor(conn proxy.AppConnSnapshot, connQuery proxy.AppConnQuery, tempDir string) *Reactor {
  36. r := &Reactor{
  37. conn: conn,
  38. connQuery: connQuery,
  39. }
  40. r.BaseReactor = *p2p.NewBaseReactor("StateSync", r)
  41. return r
  42. }
  43. // GetChannels implements p2p.Reactor.
  44. func (r *Reactor) GetChannels() []*p2p.ChannelDescriptor {
  45. return []*p2p.ChannelDescriptor{
  46. {
  47. ID: SnapshotChannel,
  48. Priority: 3,
  49. SendQueueCapacity: 10,
  50. RecvMessageCapacity: snapshotMsgSize,
  51. },
  52. {
  53. ID: ChunkChannel,
  54. Priority: 1,
  55. SendQueueCapacity: 4,
  56. RecvMessageCapacity: chunkMsgSize,
  57. },
  58. }
  59. }
  60. // OnStart implements p2p.Reactor.
  61. func (r *Reactor) OnStart() error {
  62. return nil
  63. }
  64. // AddPeer implements p2p.Reactor.
  65. func (r *Reactor) AddPeer(peer p2p.Peer) {
  66. r.mtx.RLock()
  67. defer r.mtx.RUnlock()
  68. if r.syncer != nil {
  69. r.syncer.AddPeer(peer)
  70. }
  71. }
  72. // RemovePeer implements p2p.Reactor.
  73. func (r *Reactor) RemovePeer(peer p2p.Peer, reason interface{}) {
  74. r.mtx.RLock()
  75. defer r.mtx.RUnlock()
  76. if r.syncer != nil {
  77. r.syncer.RemovePeer(peer)
  78. }
  79. }
  80. // Receive implements p2p.Reactor.
  81. // XXX: do not call any methods that can block or incur heavy processing.
  82. // https://github.com/tendermint/tendermint/issues/2888
  83. func (r *Reactor) Receive(chID byte, src p2p.Peer, msgBytes []byte) {
  84. if !r.IsRunning() {
  85. return
  86. }
  87. msg, err := decodeMsg(msgBytes)
  88. if err != nil {
  89. r.Logger.Error("Error decoding message", "src", src, "chId", chID, "msg", msg, "err", err, "bytes", msgBytes)
  90. r.Switch.StopPeerForError(src, err)
  91. return
  92. }
  93. err = validateMsg(msg)
  94. if err != nil {
  95. r.Logger.Error("Invalid message", "peer", src, "msg", msg, "err", err)
  96. r.Switch.StopPeerForError(src, err)
  97. return
  98. }
  99. switch chID {
  100. case SnapshotChannel:
  101. switch msg := msg.(type) {
  102. case *ssproto.SnapshotsRequest:
  103. snapshots, err := r.recentSnapshots(recentSnapshots)
  104. if err != nil {
  105. r.Logger.Error("Failed to fetch snapshots", "err", err)
  106. return
  107. }
  108. for _, snapshot := range snapshots {
  109. r.Logger.Debug("Advertising snapshot", "height", snapshot.Height,
  110. "format", snapshot.Format, "peer", src.ID())
  111. src.Send(chID, mustEncodeMsg(&ssproto.SnapshotsResponse{
  112. Height: snapshot.Height,
  113. Format: snapshot.Format,
  114. Chunks: snapshot.Chunks,
  115. Hash: snapshot.Hash,
  116. Metadata: snapshot.Metadata,
  117. }))
  118. }
  119. case *ssproto.SnapshotsResponse:
  120. r.mtx.RLock()
  121. defer r.mtx.RUnlock()
  122. if r.syncer == nil {
  123. r.Logger.Debug("Received unexpected snapshot, no state sync in progress")
  124. return
  125. }
  126. r.Logger.Debug("Received snapshot", "height", msg.Height, "format", msg.Format, "peer", src.ID())
  127. _, err := r.syncer.AddSnapshot(src, &snapshot{
  128. Height: msg.Height,
  129. Format: msg.Format,
  130. Chunks: msg.Chunks,
  131. Hash: msg.Hash,
  132. Metadata: msg.Metadata,
  133. })
  134. if err != nil {
  135. r.Logger.Error("Failed to add snapshot", "height", msg.Height, "format", msg.Format,
  136. "peer", src.ID(), "err", err)
  137. return
  138. }
  139. default:
  140. r.Logger.Error("Received unknown message %T", msg)
  141. }
  142. case ChunkChannel:
  143. switch msg := msg.(type) {
  144. case *ssproto.ChunkRequest:
  145. r.Logger.Debug("Received chunk request", "height", msg.Height, "format", msg.Format,
  146. "chunk", msg.Index, "peer", src.ID())
  147. resp, err := r.conn.LoadSnapshotChunkSync(abci.RequestLoadSnapshotChunk{
  148. Height: msg.Height,
  149. Format: msg.Format,
  150. Chunk: msg.Index,
  151. })
  152. if err != nil {
  153. r.Logger.Error("Failed to load chunk", "height", msg.Height, "format", msg.Format,
  154. "chunk", msg.Index, "err", err)
  155. return
  156. }
  157. r.Logger.Debug("Sending chunk", "height", msg.Height, "format", msg.Format,
  158. "chunk", msg.Index, "peer", src.ID())
  159. src.Send(ChunkChannel, mustEncodeMsg(&ssproto.ChunkResponse{
  160. Height: msg.Height,
  161. Format: msg.Format,
  162. Index: msg.Index,
  163. Chunk: resp.Chunk,
  164. Missing: resp.Chunk == nil,
  165. }))
  166. case *ssproto.ChunkResponse:
  167. r.mtx.RLock()
  168. defer r.mtx.RUnlock()
  169. if r.syncer == nil {
  170. r.Logger.Debug("Received unexpected chunk, no state sync in progress", "peer", src.ID())
  171. return
  172. }
  173. r.Logger.Debug("Received chunk, adding to sync", "height", msg.Height, "format", msg.Format,
  174. "chunk", msg.Index, "peer", src.ID())
  175. _, err := r.syncer.AddChunk(&chunk{
  176. Height: msg.Height,
  177. Format: msg.Format,
  178. Index: msg.Index,
  179. Chunk: msg.Chunk,
  180. Sender: src.ID(),
  181. })
  182. if err != nil {
  183. r.Logger.Error("Failed to add chunk", "height", msg.Height, "format", msg.Format,
  184. "chunk", msg.Index, "err", err)
  185. return
  186. }
  187. default:
  188. r.Logger.Error("Received unknown message %T", msg)
  189. }
  190. default:
  191. r.Logger.Error("Received message on invalid channel %x", chID)
  192. }
  193. }
  194. // recentSnapshots fetches the n most recent snapshots from the app
  195. func (r *Reactor) recentSnapshots(n uint32) ([]*snapshot, error) {
  196. resp, err := r.conn.ListSnapshotsSync(abci.RequestListSnapshots{})
  197. if err != nil {
  198. return nil, err
  199. }
  200. sort.Slice(resp.Snapshots, func(i, j int) bool {
  201. a := resp.Snapshots[i]
  202. b := resp.Snapshots[j]
  203. switch {
  204. case a.Height > b.Height:
  205. return true
  206. case a.Height == b.Height && a.Format > b.Format:
  207. return true
  208. default:
  209. return false
  210. }
  211. })
  212. snapshots := make([]*snapshot, 0, n)
  213. for i, s := range resp.Snapshots {
  214. if i >= recentSnapshots {
  215. break
  216. }
  217. snapshots = append(snapshots, &snapshot{
  218. Height: s.Height,
  219. Format: s.Format,
  220. Chunks: s.Chunks,
  221. Hash: s.Hash,
  222. Metadata: s.Metadata,
  223. })
  224. }
  225. return snapshots, nil
  226. }
  227. // Sync runs a state sync, returning the new state and last commit at the snapshot height.
  228. // The caller must store the state and commit in the state database and block store.
  229. func (r *Reactor) Sync(stateProvider StateProvider, discoveryTime time.Duration) (sm.State, *types.Commit, error) {
  230. r.mtx.Lock()
  231. if r.syncer != nil {
  232. r.mtx.Unlock()
  233. return sm.State{}, nil, errors.New("a state sync is already in progress")
  234. }
  235. r.syncer = newSyncer(r.Logger, r.conn, r.connQuery, stateProvider, r.tempDir)
  236. r.mtx.Unlock()
  237. // Request snapshots from all currently connected peers
  238. r.Logger.Debug("Requesting snapshots from known peers")
  239. r.Switch.Broadcast(SnapshotChannel, mustEncodeMsg(&ssproto.SnapshotsRequest{}))
  240. state, commit, err := r.syncer.SyncAny(discoveryTime)
  241. r.mtx.Lock()
  242. r.syncer = nil
  243. r.mtx.Unlock()
  244. return state, commit, err
  245. }