You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

321 lines
8.6 KiB

  1. package statesync
  2. import (
  3. "errors"
  4. "fmt"
  5. "io/ioutil"
  6. "os"
  7. "path/filepath"
  8. "strconv"
  9. "sync"
  10. "time"
  11. "github.com/tendermint/tendermint/p2p"
  12. )
  13. // errDone is returned by chunkQueue.Next() when all chunks have been returned.
  14. var errDone = errors.New("chunk queue has completed")
  15. // chunk contains data for a chunk.
  16. type chunk struct {
  17. Height uint64
  18. Format uint32
  19. Index uint32
  20. Chunk []byte
  21. Sender p2p.ID
  22. }
  23. // chunkQueue manages chunks for a state sync process, ordering them if requested. It acts as an
  24. // iterator over all chunks, but callers can request chunks to be retried, optionally after
  25. // refetching.
  26. type chunkQueue struct {
  27. sync.Mutex
  28. snapshot *snapshot // if this is nil, the queue has been closed
  29. dir string // temp dir for on-disk chunk storage
  30. chunkFiles map[uint32]string // path to temporary chunk file
  31. chunkSenders map[uint32]p2p.ID // the peer who sent the given chunk
  32. chunkAllocated map[uint32]bool // chunks that have been allocated via Allocate()
  33. chunkReturned map[uint32]bool // chunks returned via Next()
  34. waiters map[uint32][]chan<- uint32 // signals WaitFor() waiters about chunk arrival
  35. }
  36. // newChunkQueue creates a new chunk queue for a snapshot, using a temp dir for storage.
  37. // Callers must call Close() when done.
  38. func newChunkQueue(snapshot *snapshot, tempDir string) (*chunkQueue, error) {
  39. dir, err := ioutil.TempDir(tempDir, "tm-statesync")
  40. if err != nil {
  41. return nil, fmt.Errorf("unable to create temp dir for state sync chunks: %w", err)
  42. }
  43. if snapshot.Chunks == 0 {
  44. return nil, errors.New("snapshot has no chunks")
  45. }
  46. return &chunkQueue{
  47. snapshot: snapshot,
  48. dir: dir,
  49. chunkFiles: make(map[uint32]string, snapshot.Chunks),
  50. chunkSenders: make(map[uint32]p2p.ID, snapshot.Chunks),
  51. chunkAllocated: make(map[uint32]bool, snapshot.Chunks),
  52. chunkReturned: make(map[uint32]bool, snapshot.Chunks),
  53. waiters: make(map[uint32][]chan<- uint32),
  54. }, nil
  55. }
  56. // Add adds a chunk to the queue. It ignores chunks that already exist, returning false.
  57. func (q *chunkQueue) Add(chunk *chunk) (bool, error) {
  58. if chunk == nil || chunk.Chunk == nil {
  59. return false, errors.New("cannot add nil chunk")
  60. }
  61. q.Lock()
  62. defer q.Unlock()
  63. if q.snapshot == nil {
  64. return false, nil // queue is closed
  65. }
  66. if chunk.Height != q.snapshot.Height {
  67. return false, fmt.Errorf("invalid chunk height %v, expected %v", chunk.Height, q.snapshot.Height)
  68. }
  69. if chunk.Format != q.snapshot.Format {
  70. return false, fmt.Errorf("invalid chunk format %v, expected %v", chunk.Format, q.snapshot.Format)
  71. }
  72. if chunk.Index >= q.snapshot.Chunks {
  73. return false, fmt.Errorf("received unexpected chunk %v", chunk.Index)
  74. }
  75. if q.chunkFiles[chunk.Index] != "" {
  76. return false, nil
  77. }
  78. path := filepath.Join(q.dir, strconv.FormatUint(uint64(chunk.Index), 10))
  79. err := ioutil.WriteFile(path, chunk.Chunk, 0644)
  80. if err != nil {
  81. return false, fmt.Errorf("failed to save chunk %v to file %v: %w", chunk.Index, path, err)
  82. }
  83. q.chunkFiles[chunk.Index] = path
  84. q.chunkSenders[chunk.Index] = chunk.Sender
  85. // Signal any waiters that the chunk has arrived.
  86. for _, waiter := range q.waiters[chunk.Index] {
  87. waiter <- chunk.Index
  88. close(waiter)
  89. }
  90. delete(q.waiters, chunk.Index)
  91. return true, nil
  92. }
  93. // Allocate allocates a chunk to the caller, making it responsible for fetching it. Returns
  94. // errDone once no chunks are left or the queue is closed.
  95. func (q *chunkQueue) Allocate() (uint32, error) {
  96. q.Lock()
  97. defer q.Unlock()
  98. if q.snapshot == nil {
  99. return 0, errDone
  100. }
  101. if uint32(len(q.chunkAllocated)) >= q.snapshot.Chunks {
  102. return 0, errDone
  103. }
  104. for i := uint32(0); i < q.snapshot.Chunks; i++ {
  105. if !q.chunkAllocated[i] {
  106. q.chunkAllocated[i] = true
  107. return i, nil
  108. }
  109. }
  110. return 0, errDone
  111. }
  112. // Close closes the chunk queue, cleaning up all temporary files.
  113. func (q *chunkQueue) Close() error {
  114. q.Lock()
  115. defer q.Unlock()
  116. if q.snapshot == nil {
  117. return nil
  118. }
  119. for _, waiters := range q.waiters {
  120. for _, waiter := range waiters {
  121. close(waiter)
  122. }
  123. }
  124. q.waiters = nil
  125. q.snapshot = nil
  126. err := os.RemoveAll(q.dir)
  127. if err != nil {
  128. return fmt.Errorf("failed to clean up state sync tempdir %v: %w", q.dir, err)
  129. }
  130. return nil
  131. }
  132. // Discard discards a chunk. It will be removed from the queue, available for allocation, and can
  133. // be added and returned via Next() again. If the chunk is not already in the queue this does
  134. // nothing, to avoid it being allocated to multiple fetchers.
  135. func (q *chunkQueue) Discard(index uint32) error {
  136. q.Lock()
  137. defer q.Unlock()
  138. return q.discard(index)
  139. }
  140. // discard discards a chunk, scheduling it for refetching. The caller must hold the mutex lock.
  141. func (q *chunkQueue) discard(index uint32) error {
  142. if q.snapshot == nil {
  143. return nil
  144. }
  145. path := q.chunkFiles[index]
  146. if path == "" {
  147. return nil
  148. }
  149. err := os.Remove(path)
  150. if err != nil {
  151. return fmt.Errorf("failed to remove chunk %v: %w", index, err)
  152. }
  153. delete(q.chunkFiles, index)
  154. delete(q.chunkReturned, index)
  155. delete(q.chunkAllocated, index)
  156. return nil
  157. }
  158. // DiscardSender discards all *unreturned* chunks from a given sender. If the caller wants to
  159. // discard already returned chunks, this can be done via Discard().
  160. func (q *chunkQueue) DiscardSender(peerID p2p.ID) error {
  161. q.Lock()
  162. defer q.Unlock()
  163. for index, sender := range q.chunkSenders {
  164. if sender == peerID && !q.chunkReturned[index] {
  165. err := q.discard(index)
  166. if err != nil {
  167. return err
  168. }
  169. delete(q.chunkSenders, index)
  170. }
  171. }
  172. return nil
  173. }
  174. // GetSender returns the sender of the chunk with the given index, or empty if not found.
  175. func (q *chunkQueue) GetSender(index uint32) p2p.ID {
  176. q.Lock()
  177. defer q.Unlock()
  178. return q.chunkSenders[index]
  179. }
  180. // Has checks whether a chunk exists in the queue.
  181. func (q *chunkQueue) Has(index uint32) bool {
  182. q.Lock()
  183. defer q.Unlock()
  184. return q.chunkFiles[index] != ""
  185. }
  186. // load loads a chunk from disk, or nil if the chunk is not in the queue. The caller must hold the
  187. // mutex lock.
  188. func (q *chunkQueue) load(index uint32) (*chunk, error) {
  189. path, ok := q.chunkFiles[index]
  190. if !ok {
  191. return nil, nil
  192. }
  193. body, err := ioutil.ReadFile(path)
  194. if err != nil {
  195. return nil, fmt.Errorf("failed to load chunk %v: %w", index, err)
  196. }
  197. return &chunk{
  198. Height: q.snapshot.Height,
  199. Format: q.snapshot.Format,
  200. Index: index,
  201. Chunk: body,
  202. Sender: q.chunkSenders[index],
  203. }, nil
  204. }
  205. // Next returns the next chunk from the queue, or errDone if all chunks have been returned. It
  206. // blocks until the chunk is available. Concurrent Next() calls may return the same chunk.
  207. func (q *chunkQueue) Next() (*chunk, error) {
  208. q.Lock()
  209. var chunk *chunk
  210. index, err := q.nextUp()
  211. if err == nil {
  212. chunk, err = q.load(index)
  213. if err == nil {
  214. q.chunkReturned[index] = true
  215. }
  216. }
  217. q.Unlock()
  218. if chunk != nil || err != nil {
  219. return chunk, err
  220. }
  221. select {
  222. case _, ok := <-q.WaitFor(index):
  223. if !ok {
  224. return nil, errDone // queue closed
  225. }
  226. case <-time.After(chunkTimeout):
  227. return nil, errTimeout
  228. }
  229. q.Lock()
  230. defer q.Unlock()
  231. chunk, err = q.load(index)
  232. if err != nil {
  233. return nil, err
  234. }
  235. q.chunkReturned[index] = true
  236. return chunk, nil
  237. }
  238. // nextUp returns the next chunk to be returned, or errDone if all chunks have been returned. The
  239. // caller must hold the mutex lock.
  240. func (q *chunkQueue) nextUp() (uint32, error) {
  241. if q.snapshot == nil {
  242. return 0, errDone
  243. }
  244. for i := uint32(0); i < q.snapshot.Chunks; i++ {
  245. if !q.chunkReturned[i] {
  246. return i, nil
  247. }
  248. }
  249. return 0, errDone
  250. }
  251. // Retry schedules a chunk to be retried, without refetching it.
  252. func (q *chunkQueue) Retry(index uint32) {
  253. q.Lock()
  254. defer q.Unlock()
  255. delete(q.chunkReturned, index)
  256. }
  257. // RetryAll schedules all chunks to be retried, without refetching them.
  258. func (q *chunkQueue) RetryAll() {
  259. q.Lock()
  260. defer q.Unlock()
  261. q.chunkReturned = make(map[uint32]bool)
  262. }
  263. // Size returns the total number of chunks for the snapshot and queue, or 0 when closed.
  264. func (q *chunkQueue) Size() uint32 {
  265. q.Lock()
  266. defer q.Unlock()
  267. if q.snapshot == nil {
  268. return 0
  269. }
  270. return q.snapshot.Chunks
  271. }
  272. // WaitFor returns a channel that receives a chunk index when it arrives in the queue, or
  273. // immediately if it has already arrived. The channel is closed without a value if the queue is
  274. // closed or if the chunk index is not valid.
  275. func (q *chunkQueue) WaitFor(index uint32) <-chan uint32 {
  276. q.Lock()
  277. defer q.Unlock()
  278. ch := make(chan uint32, 1)
  279. switch {
  280. case q.snapshot == nil:
  281. close(ch)
  282. case index >= q.snapshot.Chunks:
  283. close(ch)
  284. case q.chunkFiles[index] != "":
  285. ch <- index
  286. close(ch)
  287. default:
  288. if q.waiters[index] == nil {
  289. q.waiters[index] = make([]chan<- uint32, 0)
  290. }
  291. q.waiters[index] = append(q.waiters[index], ch)
  292. }
  293. return ch
  294. }