You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

179 lines
4.5 KiB

9 years ago
9 years ago
9 years ago
  1. package common
  2. import (
  3. "sync/atomic"
  4. )
  5. //----------------------------------------
  6. // Task
  7. // val: the value returned after task execution.
  8. // err: the error returned during task completion.
  9. // abort: tells Parallel to return, whether or not all tasks have completed.
  10. type Task func(i int) (val interface{}, err error, abort bool)
  11. type TaskResult struct {
  12. Value interface{}
  13. Error error
  14. }
  15. type TaskResultCh <-chan TaskResult
  16. type taskResultOK struct {
  17. TaskResult
  18. OK bool
  19. }
  20. type TaskResultSet struct {
  21. chz []TaskResultCh
  22. results []taskResultOK
  23. }
  24. func newTaskResultSet(chz []TaskResultCh) *TaskResultSet {
  25. return &TaskResultSet{
  26. chz: chz,
  27. results: make([]taskResultOK, len(chz)),
  28. }
  29. }
  30. func (trs *TaskResultSet) Channels() []TaskResultCh {
  31. return trs.chz
  32. }
  33. func (trs *TaskResultSet) LatestResult(index int) (TaskResult, bool) {
  34. if len(trs.results) <= index {
  35. return TaskResult{}, false
  36. }
  37. resultOK := trs.results[index]
  38. return resultOK.TaskResult, resultOK.OK
  39. }
  40. // NOTE: Not concurrency safe.
  41. // Writes results to trs.results without waiting for all tasks to complete.
  42. func (trs *TaskResultSet) Reap() *TaskResultSet {
  43. for i := 0; i < len(trs.results); i++ {
  44. var trch = trs.chz[i]
  45. select {
  46. case result, ok := <-trch:
  47. if ok {
  48. // Write result.
  49. trs.results[i] = taskResultOK{
  50. TaskResult: result,
  51. OK: true,
  52. }
  53. }
  54. // else {
  55. // We already wrote it.
  56. // }
  57. default:
  58. // Do nothing.
  59. }
  60. }
  61. return trs
  62. }
  63. // NOTE: Not concurrency safe.
  64. // Like Reap() but waits until all tasks have returned or panic'd.
  65. func (trs *TaskResultSet) Wait() *TaskResultSet {
  66. for i := 0; i < len(trs.results); i++ {
  67. var trch = trs.chz[i]
  68. result, ok := <-trch
  69. if ok {
  70. // Write result.
  71. trs.results[i] = taskResultOK{
  72. TaskResult: result,
  73. OK: true,
  74. }
  75. }
  76. // else {
  77. // We already wrote it.
  78. // }
  79. }
  80. return trs
  81. }
  82. // Returns the firstmost (by task index) error as
  83. // discovered by all previous Reap() calls.
  84. func (trs *TaskResultSet) FirstValue() interface{} {
  85. for _, result := range trs.results {
  86. if result.Value != nil {
  87. return result.Value
  88. }
  89. }
  90. return nil
  91. }
  92. // Returns the firstmost (by task index) error as
  93. // discovered by all previous Reap() calls.
  94. func (trs *TaskResultSet) FirstError() error {
  95. for _, result := range trs.results {
  96. if result.Error != nil {
  97. return result.Error
  98. }
  99. }
  100. return nil
  101. }
  102. //----------------------------------------
  103. // Parallel
  104. // Run tasks in parallel, with ability to abort early.
  105. // Returns ok=false iff any of the tasks returned abort=true.
  106. // NOTE: Do not implement quit features here. Instead, provide convenient
  107. // concurrent quit-like primitives, passed implicitly via Task closures. (e.g.
  108. // it's not Parallel's concern how you quit/abort your tasks).
  109. func Parallel(tasks ...Task) (trs *TaskResultSet, ok bool) {
  110. var taskResultChz = make([]TaskResultCh, len(tasks)) // To return.
  111. var taskDoneCh = make(chan bool, len(tasks)) // A "wait group" channel, early abort if any true received.
  112. var numPanics = new(int32) // Keep track of panics to set ok=false later.
  113. // We will set it to false iff any tasks panic'd or returned abort.
  114. ok = true
  115. // Start all tasks in parallel in separate goroutines.
  116. // When the task is complete, it will appear in the
  117. // respective taskResultCh (associated by task index).
  118. for i, task := range tasks {
  119. var taskResultCh = make(chan TaskResult, 1) // Capacity for 1 result.
  120. taskResultChz[i] = taskResultCh
  121. go func(i int, task Task, taskResultCh chan TaskResult) {
  122. // Recovery
  123. defer func() {
  124. if pnk := recover(); pnk != nil {
  125. atomic.AddInt32(numPanics, 1)
  126. // Send panic to taskResultCh.
  127. taskResultCh <- TaskResult{nil, ErrorWrap(pnk, "Panic in task")}
  128. // Closing taskResultCh lets trs.Wait() work.
  129. close(taskResultCh)
  130. // Decrement waitgroup.
  131. taskDoneCh <- false
  132. }
  133. }()
  134. // Run the task.
  135. var val, err, abort = task(i)
  136. // Send val/err to taskResultCh.
  137. // NOTE: Below this line, nothing must panic/
  138. taskResultCh <- TaskResult{val, err}
  139. // Closing taskResultCh lets trs.Wait() work.
  140. close(taskResultCh)
  141. // Decrement waitgroup.
  142. taskDoneCh <- abort
  143. }(i, task, taskResultCh)
  144. }
  145. // Wait until all tasks are done, or until abort.
  146. // DONE_LOOP:
  147. for i := 0; i < len(tasks); i++ {
  148. abort := <-taskDoneCh
  149. if abort {
  150. ok = false
  151. break
  152. }
  153. }
  154. // Ok is also false if there were any panics.
  155. // We must do this check here (after DONE_LOOP).
  156. ok = ok && (atomic.LoadInt32(numPanics) == 0)
  157. return newTaskResultSet(taskResultChz).Reap(), ok
  158. }