Browse Source

Merge panics into errors in Parallel

pull/1780/head
Jae Kwon 6 years ago
parent
commit
e9cf47606c
2 changed files with 9 additions and 20 deletions
  1. +2
    -14
      common/async.go
  2. +7
    -6
      common/async_test.go

+ 2
- 14
common/async.go View File

@ -15,7 +15,6 @@ type Task func(i int) (val interface{}, err error, abort bool)
type TaskResult struct { type TaskResult struct {
Value interface{} Value interface{}
Error error Error error
Panic interface{}
} }
type TaskResultCh <-chan TaskResult type TaskResultCh <-chan TaskResult
@ -92,17 +91,6 @@ func (trs *TaskResultSet) FirstError() error {
return nil return nil
} }
// Returns the firstmost (by task index) panic as
// discovered by all previous Reap() calls.
func (trs *TaskResultSet) FirstPanic() interface{} {
for _, result := range trs.results {
if result.Panic != nil {
return result.Panic
}
}
return nil
}
//---------------------------------------- //----------------------------------------
// Parallel // Parallel
@ -128,7 +116,7 @@ func Parallel(tasks ...Task) (trs *TaskResultSet, ok bool) {
defer func() { defer func() {
if pnk := recover(); pnk != nil { if pnk := recover(); pnk != nil {
atomic.AddInt32(numPanics, 1) atomic.AddInt32(numPanics, 1)
taskResultCh <- TaskResult{nil, nil, pnk}
taskResultCh <- TaskResult{nil, ErrorWrap(pnk, "Panic in task")}
taskDoneCh <- false taskDoneCh <- false
} }
}() }()
@ -136,7 +124,7 @@ func Parallel(tasks ...Task) (trs *TaskResultSet, ok bool) {
var val, err, abort = task(i) var val, err, abort = task(i)
// Send val/err to taskResultCh. // Send val/err to taskResultCh.
// NOTE: Below this line, nothing must panic/ // NOTE: Below this line, nothing must panic/
taskResultCh <- TaskResult{val, err, nil}
taskResultCh <- TaskResult{val, err}
// Decrement waitgroup. // Decrement waitgroup.
taskDoneCh <- abort taskDoneCh <- abort
}(i, task, taskResultCh) }(i, task, taskResultCh)


+ 7
- 6
common/async_test.go View File

@ -37,9 +37,6 @@ func TestParallel(t *testing.T) {
} else if taskResult.Error != nil { } else if taskResult.Error != nil {
assert.Fail(t, "Task should not have errored but got %v", taskResult.Error) assert.Fail(t, "Task should not have errored but got %v", taskResult.Error)
failedTasks += 1 failedTasks += 1
} else if taskResult.Panic != nil {
assert.Fail(t, "Task should not have panic'd but got %v", taskResult.Panic)
failedTasks += 1
} else if !assert.Equal(t, -1*i, taskResult.Value.(int)) { } else if !assert.Equal(t, -1*i, taskResult.Value.(int)) {
assert.Fail(t, "Task should have returned %v but got %v", -1*i, taskResult.Value.(int)) assert.Fail(t, "Task should have returned %v but got %v", -1*i, taskResult.Value.(int))
failedTasks += 1 failedTasks += 1
@ -49,7 +46,6 @@ func TestParallel(t *testing.T) {
} }
assert.Equal(t, failedTasks, 0, "No task should have failed") assert.Equal(t, failedTasks, 0, "No task should have failed")
assert.Nil(t, trs.FirstError(), "There should be no errors") assert.Nil(t, trs.FirstError(), "There should be no errors")
assert.Nil(t, trs.FirstPanic(), "There should be no panics")
assert.Equal(t, 0, trs.FirstValue(), "First value should be 0") assert.Equal(t, 0, trs.FirstValue(), "First value should be 0")
} }
@ -132,8 +128,13 @@ func checkResult(t *testing.T, taskResultSet *TaskResultSet, index int, val inte
taskName := fmt.Sprintf("Task #%v", index) taskName := fmt.Sprintf("Task #%v", index)
assert.True(t, ok, "TaskResultCh unexpectedly closed for %v", taskName) assert.True(t, ok, "TaskResultCh unexpectedly closed for %v", taskName)
assert.Equal(t, val, taskResult.Value, taskName) assert.Equal(t, val, taskResult.Value, taskName)
assert.Equal(t, err, taskResult.Error, taskName)
assert.Equal(t, pnk, taskResult.Panic, taskName)
if err != nil {
assert.Equal(t, err, taskResult.Error, taskName)
} else if pnk != nil {
assert.Equal(t, pnk, taskResult.Error.(Error).Cause(), taskName)
} else {
assert.Nil(t, taskResult.Error, taskName)
}
} }
// Wait for timeout (no result) // Wait for timeout (no result)


Loading…
Cancel
Save