You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

181 lines
4.6 KiB

9 years ago
9 years ago
9 years ago
  1. package async
  2. import (
  3. "sync/atomic"
  4. "github.com/pkg/errors"
  5. )
  6. //----------------------------------------
  7. // Task
  8. // val: the value returned after task execution.
  9. // err: the error returned during task completion.
  10. // abort: tells Parallel to return, whether or not all tasks have completed.
  11. type Task func(i int) (val interface{}, abort bool, err error)
  12. type TaskResult struct {
  13. Value interface{}
  14. Error error
  15. }
  16. type TaskResultCh <-chan TaskResult
  17. type taskResultOK struct {
  18. TaskResult
  19. OK bool
  20. }
  21. type TaskResultSet struct {
  22. chz []TaskResultCh
  23. results []taskResultOK
  24. }
  25. func newTaskResultSet(chz []TaskResultCh) *TaskResultSet {
  26. return &TaskResultSet{
  27. chz: chz,
  28. results: make([]taskResultOK, len(chz)),
  29. }
  30. }
  31. func (trs *TaskResultSet) Channels() []TaskResultCh {
  32. return trs.chz
  33. }
  34. func (trs *TaskResultSet) LatestResult(index int) (TaskResult, bool) {
  35. if len(trs.results) <= index {
  36. return TaskResult{}, false
  37. }
  38. resultOK := trs.results[index]
  39. return resultOK.TaskResult, resultOK.OK
  40. }
  41. // NOTE: Not concurrency safe.
  42. // Writes results to trs.results without waiting for all tasks to complete.
  43. func (trs *TaskResultSet) Reap() *TaskResultSet {
  44. for i := 0; i < len(trs.results); i++ {
  45. var trch = trs.chz[i]
  46. select {
  47. case result, ok := <-trch:
  48. if ok {
  49. // Write result.
  50. trs.results[i] = taskResultOK{
  51. TaskResult: result,
  52. OK: true,
  53. }
  54. }
  55. // else {
  56. // We already wrote it.
  57. // }
  58. default:
  59. // Do nothing.
  60. }
  61. }
  62. return trs
  63. }
  64. // NOTE: Not concurrency safe.
  65. // Like Reap() but waits until all tasks have returned or panic'd.
  66. func (trs *TaskResultSet) Wait() *TaskResultSet {
  67. for i := 0; i < len(trs.results); i++ {
  68. var trch = trs.chz[i]
  69. result, ok := <-trch
  70. if ok {
  71. // Write result.
  72. trs.results[i] = taskResultOK{
  73. TaskResult: result,
  74. OK: true,
  75. }
  76. }
  77. // else {
  78. // We already wrote it.
  79. // }
  80. }
  81. return trs
  82. }
  83. // Returns the firstmost (by task index) error as
  84. // discovered by all previous Reap() calls.
  85. func (trs *TaskResultSet) FirstValue() interface{} {
  86. for _, result := range trs.results {
  87. if result.Value != nil {
  88. return result.Value
  89. }
  90. }
  91. return nil
  92. }
  93. // Returns the firstmost (by task index) error as
  94. // discovered by all previous Reap() calls.
  95. func (trs *TaskResultSet) FirstError() error {
  96. for _, result := range trs.results {
  97. if result.Error != nil {
  98. return result.Error
  99. }
  100. }
  101. return nil
  102. }
  103. //----------------------------------------
  104. // Parallel
  105. // Run tasks in parallel, with ability to abort early.
  106. // Returns ok=false iff any of the tasks returned abort=true.
  107. // NOTE: Do not implement quit features here. Instead, provide convenient
  108. // concurrent quit-like primitives, passed implicitly via Task closures. (e.g.
  109. // it's not Parallel's concern how you quit/abort your tasks).
  110. func Parallel(tasks ...Task) (trs *TaskResultSet, ok bool) {
  111. var taskResultChz = make([]TaskResultCh, len(tasks)) // To return.
  112. var taskDoneCh = make(chan bool, len(tasks)) // A "wait group" channel, early abort if any true received.
  113. var numPanics = new(int32) // Keep track of panics to set ok=false later.
  114. // We will set it to false iff any tasks panic'd or returned abort.
  115. ok = true
  116. // Start all tasks in parallel in separate goroutines.
  117. // When the task is complete, it will appear in the
  118. // respective taskResultCh (associated by task index).
  119. for i, task := range tasks {
  120. var taskResultCh = make(chan TaskResult, 1) // Capacity for 1 result.
  121. taskResultChz[i] = taskResultCh
  122. go func(i int, task Task, taskResultCh chan TaskResult) {
  123. // Recovery
  124. defer func() {
  125. if pnk := recover(); pnk != nil {
  126. atomic.AddInt32(numPanics, 1)
  127. // Send panic to taskResultCh.
  128. taskResultCh <- TaskResult{nil, errors.Errorf("panic in task %v", pnk)}
  129. // Closing taskResultCh lets trs.Wait() work.
  130. close(taskResultCh)
  131. // Decrement waitgroup.
  132. taskDoneCh <- false
  133. }
  134. }()
  135. // Run the task.
  136. var val, abort, err = task(i)
  137. // Send val/err to taskResultCh.
  138. // NOTE: Below this line, nothing must panic/
  139. taskResultCh <- TaskResult{val, err}
  140. // Closing taskResultCh lets trs.Wait() work.
  141. close(taskResultCh)
  142. // Decrement waitgroup.
  143. taskDoneCh <- abort
  144. }(i, task, taskResultCh)
  145. }
  146. // Wait until all tasks are done, or until abort.
  147. // DONE_LOOP:
  148. for i := 0; i < len(tasks); i++ {
  149. abort := <-taskDoneCh
  150. if abort {
  151. ok = false
  152. break
  153. }
  154. }
  155. // Ok is also false if there were any panics.
  156. // We must do this check here (after DONE_LOOP).
  157. ok = ok && (atomic.LoadInt32(numPanics) == 0)
  158. return newTaskResultSet(taskResultChz).Reap(), ok
  159. }