You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

357 lines
8.7 KiB

  1. package statesync
  2. import (
  3. "errors"
  4. "fmt"
  5. "io/ioutil"
  6. "os"
  7. "path/filepath"
  8. "strconv"
  9. "time"
  10. tmsync "github.com/tendermint/tendermint/libs/sync"
  11. "github.com/tendermint/tendermint/p2p"
  12. )
  13. // errDone is returned by chunkQueue.Next() when all chunks have been returned.
  14. var errDone = errors.New("chunk queue has completed")
  15. // chunk contains data for a chunk.
  16. type chunk struct {
  17. Height uint64
  18. Format uint32
  19. Index uint32
  20. Chunk []byte
  21. Sender p2p.PeerID
  22. }
  23. // chunkQueue manages chunks for a state sync process, ordering them if requested. It acts as an
  24. // iterator over all chunks, but callers can request chunks to be retried, optionally after
  25. // refetching.
  26. type chunkQueue struct {
  27. tmsync.Mutex
  28. snapshot *snapshot // if this is nil, the queue has been closed
  29. dir string // temp dir for on-disk chunk storage
  30. chunkFiles map[uint32]string // path to temporary chunk file
  31. chunkSenders map[uint32]p2p.PeerID // the peer who sent the given chunk
  32. chunkAllocated map[uint32]bool // chunks that have been allocated via Allocate()
  33. chunkReturned map[uint32]bool // chunks returned via Next()
  34. waiters map[uint32][]chan<- uint32 // signals WaitFor() waiters about chunk arrival
  35. }
  36. // newChunkQueue creates a new chunk queue for a snapshot, using a temp dir for storage.
  37. // Callers must call Close() when done.
  38. func newChunkQueue(snapshot *snapshot, tempDir string) (*chunkQueue, error) {
  39. dir, err := ioutil.TempDir(tempDir, "tm-statesync")
  40. if err != nil {
  41. return nil, fmt.Errorf("unable to create temp dir for state sync chunks: %w", err)
  42. }
  43. if snapshot.Chunks == 0 {
  44. return nil, errors.New("snapshot has no chunks")
  45. }
  46. return &chunkQueue{
  47. snapshot: snapshot,
  48. dir: dir,
  49. chunkFiles: make(map[uint32]string, snapshot.Chunks),
  50. chunkSenders: make(map[uint32]p2p.PeerID, snapshot.Chunks),
  51. chunkAllocated: make(map[uint32]bool, snapshot.Chunks),
  52. chunkReturned: make(map[uint32]bool, snapshot.Chunks),
  53. waiters: make(map[uint32][]chan<- uint32),
  54. }, nil
  55. }
  56. // Add adds a chunk to the queue. It ignores chunks that already exist, returning false.
  57. func (q *chunkQueue) Add(chunk *chunk) (bool, error) {
  58. if chunk == nil || chunk.Chunk == nil {
  59. return false, errors.New("cannot add nil chunk")
  60. }
  61. q.Lock()
  62. defer q.Unlock()
  63. if q.snapshot == nil {
  64. return false, nil // queue is closed
  65. }
  66. if chunk.Height != q.snapshot.Height {
  67. return false, fmt.Errorf("invalid chunk height %v, expected %v", chunk.Height, q.snapshot.Height)
  68. }
  69. if chunk.Format != q.snapshot.Format {
  70. return false, fmt.Errorf("invalid chunk format %v, expected %v", chunk.Format, q.snapshot.Format)
  71. }
  72. if chunk.Index >= q.snapshot.Chunks {
  73. return false, fmt.Errorf("received unexpected chunk %v", chunk.Index)
  74. }
  75. if q.chunkFiles[chunk.Index] != "" {
  76. return false, nil
  77. }
  78. path := filepath.Join(q.dir, strconv.FormatUint(uint64(chunk.Index), 10))
  79. err := ioutil.WriteFile(path, chunk.Chunk, 0600)
  80. if err != nil {
  81. return false, fmt.Errorf("failed to save chunk %v to file %v: %w", chunk.Index, path, err)
  82. }
  83. q.chunkFiles[chunk.Index] = path
  84. q.chunkSenders[chunk.Index] = chunk.Sender
  85. // Signal any waiters that the chunk has arrived.
  86. for _, waiter := range q.waiters[chunk.Index] {
  87. waiter <- chunk.Index
  88. close(waiter)
  89. }
  90. delete(q.waiters, chunk.Index)
  91. return true, nil
  92. }
  93. // Allocate allocates a chunk to the caller, making it responsible for fetching it. Returns
  94. // errDone once no chunks are left or the queue is closed.
  95. func (q *chunkQueue) Allocate() (uint32, error) {
  96. q.Lock()
  97. defer q.Unlock()
  98. if q.snapshot == nil {
  99. return 0, errDone
  100. }
  101. if uint32(len(q.chunkAllocated)) >= q.snapshot.Chunks {
  102. return 0, errDone
  103. }
  104. for i := uint32(0); i < q.snapshot.Chunks; i++ {
  105. if !q.chunkAllocated[i] {
  106. q.chunkAllocated[i] = true
  107. return i, nil
  108. }
  109. }
  110. return 0, errDone
  111. }
  112. // Close closes the chunk queue, cleaning up all temporary files.
  113. func (q *chunkQueue) Close() error {
  114. q.Lock()
  115. defer q.Unlock()
  116. if q.snapshot == nil {
  117. return nil
  118. }
  119. for _, waiters := range q.waiters {
  120. for _, waiter := range waiters {
  121. close(waiter)
  122. }
  123. }
  124. q.waiters = nil
  125. q.snapshot = nil
  126. if err := os.RemoveAll(q.dir); err != nil {
  127. return fmt.Errorf("failed to clean up state sync tempdir %v: %w", q.dir, err)
  128. }
  129. return nil
  130. }
  131. // Discard discards a chunk. It will be removed from the queue, available for allocation, and can
  132. // be added and returned via Next() again. If the chunk is not already in the queue this does
  133. // nothing, to avoid it being allocated to multiple fetchers.
  134. func (q *chunkQueue) Discard(index uint32) error {
  135. q.Lock()
  136. defer q.Unlock()
  137. return q.discard(index)
  138. }
  139. // discard discards a chunk, scheduling it for refetching. The caller must hold the mutex lock.
  140. func (q *chunkQueue) discard(index uint32) error {
  141. if q.snapshot == nil {
  142. return nil
  143. }
  144. path := q.chunkFiles[index]
  145. if path == "" {
  146. return nil
  147. }
  148. if err := os.Remove(path); err != nil {
  149. return fmt.Errorf("failed to remove chunk %v: %w", index, err)
  150. }
  151. delete(q.chunkFiles, index)
  152. delete(q.chunkReturned, index)
  153. delete(q.chunkAllocated, index)
  154. return nil
  155. }
  156. // DiscardSender discards all *unreturned* chunks from a given sender. If the caller wants to
  157. // discard already returned chunks, this can be done via Discard().
  158. func (q *chunkQueue) DiscardSender(peerID p2p.PeerID) error {
  159. q.Lock()
  160. defer q.Unlock()
  161. for index, sender := range q.chunkSenders {
  162. if sender.Equal(peerID) && !q.chunkReturned[index] {
  163. err := q.discard(index)
  164. if err != nil {
  165. return err
  166. }
  167. delete(q.chunkSenders, index)
  168. }
  169. }
  170. return nil
  171. }
  172. // GetSender returns the sender of the chunk with the given index, or empty if
  173. // not found.
  174. func (q *chunkQueue) GetSender(index uint32) p2p.PeerID {
  175. q.Lock()
  176. defer q.Unlock()
  177. return q.chunkSenders[index]
  178. }
  179. // Has checks whether a chunk exists in the queue.
  180. func (q *chunkQueue) Has(index uint32) bool {
  181. q.Lock()
  182. defer q.Unlock()
  183. return q.chunkFiles[index] != ""
  184. }
  185. // load loads a chunk from disk, or nil if the chunk is not in the queue. The caller must hold the
  186. // mutex lock.
  187. func (q *chunkQueue) load(index uint32) (*chunk, error) {
  188. path, ok := q.chunkFiles[index]
  189. if !ok {
  190. return nil, nil
  191. }
  192. body, err := ioutil.ReadFile(path)
  193. if err != nil {
  194. return nil, fmt.Errorf("failed to load chunk %v: %w", index, err)
  195. }
  196. return &chunk{
  197. Height: q.snapshot.Height,
  198. Format: q.snapshot.Format,
  199. Index: index,
  200. Chunk: body,
  201. Sender: q.chunkSenders[index],
  202. }, nil
  203. }
  204. // Next returns the next chunk from the queue, or errDone if all chunks have been returned. It
  205. // blocks until the chunk is available. Concurrent Next() calls may return the same chunk.
  206. func (q *chunkQueue) Next() (*chunk, error) {
  207. q.Lock()
  208. var chunk *chunk
  209. index, err := q.nextUp()
  210. if err == nil {
  211. chunk, err = q.load(index)
  212. if err == nil {
  213. q.chunkReturned[index] = true
  214. }
  215. }
  216. q.Unlock()
  217. if chunk != nil || err != nil {
  218. return chunk, err
  219. }
  220. select {
  221. case _, ok := <-q.WaitFor(index):
  222. if !ok {
  223. return nil, errDone // queue closed
  224. }
  225. case <-time.After(chunkTimeout):
  226. return nil, errTimeout
  227. }
  228. q.Lock()
  229. defer q.Unlock()
  230. chunk, err = q.load(index)
  231. if err != nil {
  232. return nil, err
  233. }
  234. q.chunkReturned[index] = true
  235. return chunk, nil
  236. }
  237. // nextUp returns the next chunk to be returned, or errDone if all chunks have been returned. The
  238. // caller must hold the mutex lock.
  239. func (q *chunkQueue) nextUp() (uint32, error) {
  240. if q.snapshot == nil {
  241. return 0, errDone
  242. }
  243. for i := uint32(0); i < q.snapshot.Chunks; i++ {
  244. if !q.chunkReturned[i] {
  245. return i, nil
  246. }
  247. }
  248. return 0, errDone
  249. }
  250. // Retry schedules a chunk to be retried, without refetching it.
  251. func (q *chunkQueue) Retry(index uint32) {
  252. q.Lock()
  253. defer q.Unlock()
  254. delete(q.chunkReturned, index)
  255. }
  256. // RetryAll schedules all chunks to be retried, without refetching them.
  257. func (q *chunkQueue) RetryAll() {
  258. q.Lock()
  259. defer q.Unlock()
  260. q.chunkReturned = make(map[uint32]bool)
  261. }
  262. // Size returns the total number of chunks for the snapshot and queue, or 0 when closed.
  263. func (q *chunkQueue) Size() uint32 {
  264. q.Lock()
  265. defer q.Unlock()
  266. if q.snapshot == nil {
  267. return 0
  268. }
  269. return q.snapshot.Chunks
  270. }
  271. // WaitFor returns a channel that receives a chunk index when it arrives in the queue, or
  272. // immediately if it has already arrived. The channel is closed without a value if the queue is
  273. // closed or if the chunk index is not valid.
  274. func (q *chunkQueue) WaitFor(index uint32) <-chan uint32 {
  275. q.Lock()
  276. defer q.Unlock()
  277. ch := make(chan uint32, 1)
  278. switch {
  279. case q.snapshot == nil:
  280. close(ch)
  281. case index >= q.snapshot.Chunks:
  282. close(ch)
  283. case q.chunkFiles[index] != "":
  284. ch <- index
  285. close(ch)
  286. default:
  287. if q.waiters[index] == nil {
  288. q.waiters[index] = make([]chan<- uint32, 0)
  289. }
  290. q.waiters[index] = append(q.waiters[index], ch)
  291. }
  292. return ch
  293. }