You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

262 lines
6.2 KiB

  1. package statesync
  2. import (
  3. "container/heap"
  4. "fmt"
  5. "sync"
  6. "time"
  7. "github.com/tendermint/tendermint/types"
  8. )
  9. type lightBlockResponse struct {
  10. block *types.LightBlock
  11. peer types.NodeID
  12. }
  13. // a block queue is used for asynchronously fetching and verifying light blocks
  14. type blockQueue struct {
  15. mtx sync.Mutex
  16. // cursors to keep track of which heights need to be fetched and verified
  17. fetchHeight int64
  18. verifyHeight int64
  19. // termination conditions
  20. stopHeight int64
  21. stopTime time.Time
  22. terminal *types.LightBlock
  23. // track failed heights so we know what blocks to try fetch again
  24. failed *maxIntHeap
  25. // also count retries to know when to give up
  26. retries int
  27. maxRetries int
  28. // store inbound blocks and serve them to a verifying thread via a channel
  29. pending map[int64]lightBlockResponse
  30. verifyCh chan lightBlockResponse
  31. // waiters are workers on idle until a height is required
  32. waiters []chan int64
  33. // this channel is closed once the verification process is complete
  34. doneCh chan struct{}
  35. }
  36. func newBlockQueue(
  37. startHeight, stopHeight int64,
  38. stopTime time.Time,
  39. maxRetries int,
  40. ) *blockQueue {
  41. return &blockQueue{
  42. stopHeight: stopHeight,
  43. stopTime: stopTime,
  44. fetchHeight: startHeight,
  45. verifyHeight: startHeight,
  46. pending: make(map[int64]lightBlockResponse),
  47. failed: &maxIntHeap{},
  48. retries: 0,
  49. maxRetries: maxRetries,
  50. waiters: make([]chan int64, 0),
  51. doneCh: make(chan struct{}),
  52. }
  53. }
  54. // Add adds a block to the queue to be verified and stored
  55. // CONTRACT: light blocks should have passed basic validation
  56. func (q *blockQueue) add(l lightBlockResponse) {
  57. q.mtx.Lock()
  58. defer q.mtx.Unlock()
  59. // return early if the process has already finished
  60. select {
  61. case <-q.doneCh:
  62. return
  63. default:
  64. }
  65. // sometimes more blocks are fetched then what is necessary. If we already
  66. // have what we need then ignore this
  67. if q.terminal != nil && l.block.Height < q.terminal.Height {
  68. return
  69. }
  70. // if the block that was returned is at the verify height then the verifier
  71. // is already waiting for this block so we send it directly to them
  72. if l.block.Height == q.verifyHeight && q.verifyCh != nil {
  73. q.verifyCh <- l
  74. close(q.verifyCh)
  75. q.verifyCh = nil
  76. } else {
  77. // else we add it in the pending bucket
  78. q.pending[l.block.Height] = l
  79. }
  80. // Lastly, if the incoming block is past the stop time and stop height then
  81. // we mark it as the terminal block
  82. if l.block.Height <= q.stopHeight && l.block.Time.Before(q.stopTime) {
  83. q.terminal = l.block
  84. }
  85. }
  86. // NextHeight returns the next height that needs to be retrieved.
  87. // We assume that for every height allocated that the peer will eventually add
  88. // the block or signal that it needs to be retried
  89. func (q *blockQueue) nextHeight() <-chan int64 {
  90. q.mtx.Lock()
  91. defer q.mtx.Unlock()
  92. ch := make(chan int64, 1)
  93. // if a previous process failed then we pick up this one
  94. if q.failed.Len() > 0 {
  95. failedHeight := heap.Pop(q.failed)
  96. ch <- failedHeight.(int64)
  97. close(ch)
  98. return ch
  99. }
  100. if q.terminal == nil {
  101. // return and decrement the fetch height
  102. ch <- q.fetchHeight
  103. q.fetchHeight--
  104. close(ch)
  105. return ch
  106. }
  107. // at this point there is no height that we know we need so we create a
  108. // waiter to hold out for either an outgoing request to fail or a block to
  109. // fail verification
  110. q.waiters = append(q.waiters, ch)
  111. return ch
  112. }
  113. // Finished returns true when the block queue has has all light blocks retrieved,
  114. // verified and stored. There is no more work left to be done
  115. func (q *blockQueue) done() <-chan struct{} {
  116. return q.doneCh
  117. }
  118. // VerifyNext pulls the next block off the pending queue and adds it to a
  119. // channel if it's already there or creates a waiter to add it to the
  120. // channel once it comes in. NOTE: This is assumed to
  121. // be a single thread as light blocks need to be sequentially verified.
  122. func (q *blockQueue) verifyNext() <-chan lightBlockResponse {
  123. q.mtx.Lock()
  124. defer q.mtx.Unlock()
  125. ch := make(chan lightBlockResponse, 1)
  126. select {
  127. case <-q.doneCh:
  128. return ch
  129. default:
  130. }
  131. if lb, ok := q.pending[q.verifyHeight]; ok {
  132. ch <- lb
  133. close(ch)
  134. delete(q.pending, q.verifyHeight)
  135. } else {
  136. q.verifyCh = ch
  137. }
  138. return ch
  139. }
  140. // Retry is called when a dispatcher failed to fetch a light block or the
  141. // fetched light block failed verification. It signals to the queue to add the
  142. // height back to the request queue
  143. func (q *blockQueue) retry(height int64) {
  144. q.mtx.Lock()
  145. defer q.mtx.Unlock()
  146. select {
  147. case <-q.doneCh:
  148. return
  149. default:
  150. }
  151. // we don't need to retry if this is below the terminal height
  152. if q.terminal != nil && height < q.terminal.Height {
  153. return
  154. }
  155. q.retries++
  156. if q.retries >= q.maxRetries {
  157. q._closeChannels()
  158. return
  159. }
  160. if len(q.waiters) > 0 {
  161. q.waiters[0] <- height
  162. close(q.waiters[0])
  163. q.waiters = q.waiters[1:]
  164. } else {
  165. heap.Push(q.failed, height)
  166. }
  167. }
  168. // Success is called when a light block has been successfully verified and
  169. // processed
  170. func (q *blockQueue) success(height int64) {
  171. q.mtx.Lock()
  172. defer q.mtx.Unlock()
  173. if q.terminal != nil && q.verifyHeight == q.terminal.Height {
  174. q._closeChannels()
  175. }
  176. q.verifyHeight--
  177. }
  178. func (q *blockQueue) error() error {
  179. q.mtx.Lock()
  180. defer q.mtx.Unlock()
  181. if q.retries >= q.maxRetries {
  182. return fmt.Errorf("max retries to fetch valid blocks exceeded (%d); "+
  183. "target height: %d, height reached: %d", q.maxRetries, q.stopHeight, q.verifyHeight)
  184. }
  185. return nil
  186. }
  187. // close the queue and respective channels
  188. func (q *blockQueue) close() {
  189. q.mtx.Lock()
  190. defer q.mtx.Unlock()
  191. q._closeChannels()
  192. }
  193. // CONTRACT: must have a write lock. Use close instead
  194. func (q *blockQueue) _closeChannels() {
  195. close(q.doneCh)
  196. // wait for the channel to be drained
  197. select {
  198. case <-q.doneCh:
  199. return
  200. default:
  201. }
  202. for _, ch := range q.waiters {
  203. close(ch)
  204. }
  205. if q.verifyCh != nil {
  206. close(q.verifyCh)
  207. }
  208. }
  209. // A max-heap of ints.
  210. type maxIntHeap []int64
  211. func (h maxIntHeap) Len() int { return len(h) }
  212. func (h maxIntHeap) Less(i, j int) bool { return h[i] < h[j] }
  213. func (h maxIntHeap) Swap(i, j int) { h[i], h[j] = h[j], h[i] }
  214. func (h *maxIntHeap) Push(x interface{}) {
  215. *h = append(*h, x.(int64))
  216. }
  217. func (h *maxIntHeap) Pop() interface{} {
  218. old := *h
  219. n := len(old)
  220. x := old[n-1]
  221. *h = old[0 : n-1]
  222. return x
  223. }