Skip to content

Commit b8e1662

Browse files
authored
Merge pull request #4 from mborders/cron-jobs
Added DispatchCron to allow for creating cron-based jobs
2 parents 4438e29 + 83bad4f commit b8e1662

5 files changed

Lines changed: 100 additions & 0 deletions

File tree

README.md

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,8 @@ Simple in-memory job queue for Golang using worker-based dispatching
99

1010
Documentation here: https://godoc.org/github.com/mborders/artifex
1111

12+
Cron jobs use the robfig/cron library: https://godoc.org/github.com/robfig/cron
13+
1214
## Example Usage
1315

1416
```go
@@ -32,6 +34,14 @@ dt, err := d.DispatchEvery(func() {
3234
// Stop a given DispatchTicker
3335
dt.Stop()
3436

37+
// Returns a DispatchCron
38+
dc, err := d.DispatchCron(func() {
39+
// do something every 1s
40+
}, "*/1 * * * * *")
41+
42+
// Stop a given DispatchCron
43+
dc.Stop()
44+
3545
// Stop a dispatcher and all its workers/tickers
3646
d.Stop()
3747
```

dispatcher.go

Lines changed: 41 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ package artifex
22

33
import (
44
"errors"
5+
"github.com/robfig/cron/v3"
56
"time"
67
)
78

@@ -12,6 +13,7 @@ type Dispatcher struct {
1213
maxQueue int
1314
workers []*Worker
1415
tickers []*DispatchTicker
16+
crons []*DispatchCron
1517
workerPool chan chan Job
1618
jobQueue chan Job
1719
quit chan bool
@@ -34,6 +36,7 @@ func NewDispatcher(maxWorkers int, maxQueue int) *Dispatcher {
3436
func (d *Dispatcher) Start() {
3537
d.workers = []*Worker{}
3638
d.tickers = []*DispatchTicker{}
39+
d.crons = []*DispatchCron{}
3740
d.workerPool = make(chan chan Job, d.maxWorkers)
3841
d.jobQueue = make(chan Job, d.maxQueue)
3942
d.quit = make(chan bool)
@@ -78,8 +81,13 @@ func (d *Dispatcher) Stop() {
7881
d.tickers[i].Stop()
7982
}
8083

84+
for i := range d.crons {
85+
d.crons[i].Stop()
86+
}
87+
8188
d.workers = []*Worker{}
8289
d.tickers = []*DispatchTicker{}
90+
d.crons = []*DispatchCron{}
8391
d.quit <- true
8492
}
8593

@@ -134,6 +142,28 @@ func (d *Dispatcher) DispatchEvery(run func(), interval time.Duration) (*Dispatc
134142
return dt, nil
135143
}
136144

145+
// DispatchEvery pushes the given job into the job queue
146+
// each time the cron definition is met
147+
func (d *Dispatcher) DispatchCron(run func(), cronStr string) (*DispatchCron, error) {
148+
if !d.active {
149+
return nil, errors.New("dispatcher is not active")
150+
}
151+
152+
dc := &DispatchCron{cron: cron.New(cron.WithSeconds())}
153+
d.crons = append(d.crons, dc)
154+
155+
_, err := dc.cron.AddFunc(cronStr, func() {
156+
d.jobQueue <- Job{Run: run}
157+
})
158+
159+
if err != nil {
160+
return nil, errors.New("invalid cron definition")
161+
}
162+
163+
dc.cron.Start()
164+
return dc, nil
165+
}
166+
137167
// DispatchTicker represents a dispatched job ticker
138168
// that executes on a given interval. This provides
139169
// a means for stopping the execution cycle from continuing.
@@ -147,3 +177,14 @@ func (dt *DispatchTicker) Stop() {
147177
dt.ticker.Stop()
148178
dt.quit <- true
149179
}
180+
181+
// DispatchCron represents a dispatched cron job
182+
// that executes using cron expression formats.
183+
type DispatchCron struct {
184+
cron *cron.Cron
185+
}
186+
187+
// Stops ends the execution cycle for the given cron.
188+
func (c *DispatchCron) Stop() {
189+
c.cron.Stop()
190+
}

dispatcher_test.go

Lines changed: 46 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -160,6 +160,10 @@ func TestDispatcher_Stop(t *testing.T) {
160160
}, time.Millisecond*100)
161161
assert.NotNil(t, err)
162162

163+
_, err = d.DispatchCron(func() {
164+
}, "*/1 * * * * *")
165+
assert.NotNil(t, err)
166+
163167
time.Sleep(time.Millisecond * 100)
164168
assert.Equal(t, 1, c)
165169
}
@@ -209,3 +213,45 @@ func TestDispatcher_StopTwice(t *testing.T) {
209213
d.Stop()
210214
d.Stop()
211215
}
216+
217+
func TestDispatcher_DispatchCron(t *testing.T) {
218+
c := 0
219+
d := NewDispatcher(1, 3)
220+
d.Start()
221+
222+
_, err := d.DispatchCron(func() {
223+
c++
224+
}, "*/1 * * * * *")
225+
assert.Nil(t, err)
226+
227+
time.Sleep(time.Millisecond * 3000)
228+
assert.Equal(t, 3, c)
229+
}
230+
231+
func TestDispatcher_DispatchCron_Stop(t *testing.T) {
232+
c := 0
233+
d := NewDispatcher(1, 3)
234+
d.Start()
235+
236+
_, err := d.DispatchCron(func() {
237+
c++
238+
}, "*/1 * * * * *")
239+
assert.Nil(t, err)
240+
241+
time.Sleep(time.Millisecond * 3000)
242+
d.Stop()
243+
assert.Equal(t, 3, c)
244+
245+
time.Sleep(time.Second * 1)
246+
assert.Equal(t, 3, c)
247+
}
248+
249+
func TestDispatcher_DispatchCron_InvalidDefinition(t *testing.T) {
250+
d := NewDispatcher(1, 3)
251+
d.Start()
252+
253+
_, err := d.DispatchCron(func() {
254+
}, "foobar")
255+
assert.NotNil(t, err)
256+
assert.Equal(t, "invalid cron definition", err.Error())
257+
}

go.mod

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,5 +2,6 @@ module github.com/mborders/artifex
22

33
require (
44
github.com/davecgh/go-spew v1.1.1 // indirect
5+
github.com/robfig/cron/v3 v3.0.1
56
github.com/stretchr/testify v1.3.0
67
)

go.sum

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,8 @@ github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c
33
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
44
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
55
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
6+
github.com/robfig/cron/v3 v3.0.1 h1:WdRxkvbJztn8LMz/QEvLN5sBU+xKpSqwwUO1Pjr4qDs=
7+
github.com/robfig/cron/v3 v3.0.1/go.mod h1:eQICP3HwyT7UooqI/z+Ov+PtYAWygg1TEWWzGIFLtro=
68
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
79
github.com/stretchr/testify v1.3.0 h1:TivCn/peBQ7UY8ooIcPgZFpTNSz0Q2U6UrFlUfqbe0Q=
810
github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI=

0 commit comments

Comments
 (0)