Skip to content

Commit a51665d

Browse files
committed
feat: Add reactive worker pool.
1 parent 3979f23 commit a51665d

File tree

1 file changed

+66
-0
lines changed

1 file changed

+66
-0
lines changed

pkg/run/pool.go

Lines changed: 66 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,66 @@
1+
package run
2+
3+
import "github.com/nitrictech/nitric/pkg/worker"
4+
5+
type WorkerEventType string
6+
7+
const (
8+
WorkerEventType_Add WorkerEventType = "add"
9+
WorkerEventType_Remove WorkerEventType = "remove"
10+
)
11+
12+
type WorkerEvent struct {
13+
Type WorkerEventType
14+
Worker worker.Worker
15+
}
16+
17+
type WorkerListener = func(WorkerEvent)
18+
19+
type RunProcessPool struct {
20+
worker.WorkerPool
21+
listeners []WorkerListener
22+
}
23+
24+
func (r *RunProcessPool) notifyListeners(evt WorkerEvent) {
25+
for _, l := range r.listeners {
26+
l(evt)
27+
}
28+
}
29+
30+
func (r *RunProcessPool) AddWorker(w worker.Worker) error {
31+
if err := r.WorkerPool.AddWorker(w); err != nil {
32+
return err
33+
}
34+
// notify listener of successfully added worker
35+
r.notifyListeners(WorkerEvent{
36+
Type: WorkerEventType_Add,
37+
Worker: w,
38+
})
39+
return nil
40+
}
41+
42+
func (r *RunProcessPool) RemoveWorker(w worker.Worker) error {
43+
if err := r.WorkerPool.RemoveWorker(w); err != nil {
44+
return err
45+
}
46+
// notify listener of successfully removed worker
47+
r.notifyListeners(WorkerEvent{
48+
Type: WorkerEventType_Remove,
49+
Worker: w,
50+
})
51+
return nil
52+
}
53+
54+
func (r *RunProcessPool) Listen(l WorkerListener) {
55+
r.listeners = append(r.listeners, l)
56+
}
57+
58+
func NewRunProcessPool() *RunProcessPool {
59+
return &RunProcessPool{
60+
listeners: make([]WorkerListener, 0),
61+
WorkerPool: worker.NewProcessPool(&worker.ProcessPoolOptions{
62+
MinWorkers: 0,
63+
MaxWorkers: 100,
64+
}),
65+
}
66+
}

0 commit comments

Comments
 (0)