This repository was archived by the owner on Apr 14, 2021. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 23
Expand file tree
/
Copy pathconcurrent.go
More file actions
52 lines (44 loc) · 1.31 KB
/
concurrent.go
File metadata and controls
52 lines (44 loc) · 1.31 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
package fritz
import "sync/atomic"
// result is a simple model of a concurrent task, having a
// simple payload and an error.
type result struct {
msg string
err error
}
type successHandler func(string, string) result
type errorHandler func(string, string, error) result
type workTable map[string]func() (string, error)
// scatterGather forks the workTable into separate goroutines
// with callbacks onSuccess and onError. The results are gathered
// in slice. Neither onSuccess nor onError should panic, otherwise
// scatterGather panics.
func scatterGather(wt workTable, onSuccess successHandler, onError errorHandler) []result {
amountOfWork := len(wt)
if amountOfWork == 0 {
return []result{}
}
scatterChannel := make(chan result, amountOfWork)
var ops uint64
for key, work := range wt {
go func(k string, w func() (string, error)) {
defer closeOnDone(&ops, uint64(amountOfWork), scatterChannel)
msg, err := w()
if err == nil {
scatterChannel <- onSuccess(k, msg)
} else {
scatterChannel <- onError(k, msg, err)
}
}(key, work)
}
results := make([]result, 0, amountOfWork)
for res := range scatterChannel {
results = append(results, res)
}
return results
}
func closeOnDone(ops *uint64, amountOfWork uint64, ch chan result) {
if atomic.AddUint64(ops, 1) == amountOfWork {
close(ch)
}
}