You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

160 lines
4.1 KiB

9 years ago
9 years ago
9 years ago
  1. package common
  2. import (
  3. "sync/atomic"
  4. )
  5. //----------------------------------------
  6. // Task
  7. // val: the value returned after task execution.
  8. // err: the error returned during task completion.
  9. // abort: tells Parallel to return, whether or not all tasks have completed.
  10. type Task func(i int) (val interface{}, err error, abort bool)
  11. type TaskResult struct {
  12. Value interface{}
  13. Error error
  14. Panic interface{}
  15. }
  16. type TaskResultCh <-chan TaskResult
  17. type taskResultOK struct {
  18. TaskResult
  19. OK bool
  20. }
  21. type TaskResultSet struct {
  22. chz []TaskResultCh
  23. results []taskResultOK
  24. }
  25. func newTaskResultSet(chz []TaskResultCh) *TaskResultSet {
  26. return &TaskResultSet{
  27. chz: chz,
  28. results: nil,
  29. }
  30. }
  31. func (trs *TaskResultSet) Channels() []TaskResultCh {
  32. return trs.chz
  33. }
  34. func (trs *TaskResultSet) LatestResult(index int) (TaskResult, bool) {
  35. if len(trs.results) <= index {
  36. return TaskResult{}, false
  37. }
  38. resultOK := trs.results[index]
  39. return resultOK.TaskResult, resultOK.OK
  40. }
  41. // NOTE: Not concurrency safe.
  42. func (trs *TaskResultSet) Reap() *TaskResultSet {
  43. if trs.results == nil {
  44. trs.results = make([]taskResultOK, len(trs.chz))
  45. }
  46. for i := 0; i < len(trs.results); i++ {
  47. var trch = trs.chz[i]
  48. select {
  49. case result := <-trch:
  50. // Overwrite result.
  51. trs.results[i] = taskResultOK{
  52. TaskResult: result,
  53. OK: true,
  54. }
  55. default:
  56. // Do nothing.
  57. }
  58. }
  59. return trs
  60. }
  61. // Returns the firstmost (by task index) error as
  62. // discovered by all previous Reap() calls.
  63. func (trs *TaskResultSet) FirstValue() interface{} {
  64. for _, result := range trs.results {
  65. if result.Value != nil {
  66. return result.Value
  67. }
  68. }
  69. return nil
  70. }
  71. // Returns the firstmost (by task index) error as
  72. // discovered by all previous Reap() calls.
  73. func (trs *TaskResultSet) FirstError() error {
  74. for _, result := range trs.results {
  75. if result.Error != nil {
  76. return result.Error
  77. }
  78. }
  79. return nil
  80. }
  81. // Returns the firstmost (by task index) panic as
  82. // discovered by all previous Reap() calls.
  83. func (trs *TaskResultSet) FirstPanic() interface{} {
  84. for _, result := range trs.results {
  85. if result.Panic != nil {
  86. return result.Panic
  87. }
  88. }
  89. return nil
  90. }
  91. //----------------------------------------
  92. // Parallel
  93. // Run tasks in parallel, with ability to abort early.
  94. // Returns ok=false iff any of the tasks returned abort=true.
  95. // NOTE: Do not implement quit features here. Instead, provide convenient
  96. // concurrent quit-like primitives, passed implicitly via Task closures. (e.g.
  97. // it's not Parallel's concern how you quit/abort your tasks).
  98. func Parallel(tasks ...Task) (trs *TaskResultSet, ok bool) {
  99. var taskResultChz = make([]TaskResultCh, len(tasks)) // To return.
  100. var taskDoneCh = make(chan bool, len(tasks)) // A "wait group" channel, early abort if any true received.
  101. var numPanics = new(int32) // Keep track of panics to set ok=false later.
  102. ok = true // We will set it to false iff any tasks panic'd or returned abort.
  103. // Start all tasks in parallel in separate goroutines.
  104. // When the task is complete, it will appear in the
  105. // respective taskResultCh (associated by task index).
  106. for i, task := range tasks {
  107. var taskResultCh = make(chan TaskResult, 1) // Capacity for 1 result.
  108. taskResultChz[i] = taskResultCh
  109. go func(i int, task Task, taskResultCh chan TaskResult) {
  110. // Recovery
  111. defer func() {
  112. if pnk := recover(); pnk != nil {
  113. atomic.AddInt32(numPanics, 1)
  114. taskResultCh <- TaskResult{nil, nil, pnk}
  115. taskDoneCh <- false
  116. }
  117. }()
  118. // Run the task.
  119. var val, err, abort = task(i)
  120. // Send val/err to taskResultCh.
  121. // NOTE: Below this line, nothing must panic/
  122. taskResultCh <- TaskResult{val, err, nil}
  123. // Decrement waitgroup.
  124. taskDoneCh <- abort
  125. }(i, task, taskResultCh)
  126. }
  127. // Wait until all tasks are done, or until abort.
  128. // DONE_LOOP:
  129. for i := 0; i < len(tasks); i++ {
  130. abort := <-taskDoneCh
  131. if abort {
  132. ok = false
  133. break
  134. }
  135. }
  136. // Ok is also false if there were any panics.
  137. // We must do this check here (after DONE_LOOP).
  138. ok = ok && (atomic.LoadInt32(numPanics) == 0)
  139. return newTaskResultSet(taskResultChz).Reap(), ok
  140. }