Skip to content

Commit 3b4c426

Browse files
authored
fix: expire stale pending dispatch reservations (#1982)
1 parent ca6898a commit 3b4c426

File tree

11 files changed

+760
-128
lines changed

11 files changed

+760
-128
lines changed

internal/intg/distr/fixtures_test.go

Lines changed: 42 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,7 @@ type fixtureConfig struct {
3939
workerCount int
4040
workerLabels map[string]string
4141
logPersistence bool
42+
configMutators []func(*config.Config)
4243
dagsDir string
4344
baseConfigPath string
4445
workerBaseConfigPath string // Override worker's base config path (for testing embedded base config)
@@ -72,6 +73,12 @@ func withLogPersistence() fixtureOption {
7273
return func(c *fixtureConfig) { c.logPersistence = true }
7374
}
7475

76+
func withConfigMutator(mutator func(*config.Config)) fixtureOption {
77+
return func(c *fixtureConfig) {
78+
c.configMutators = append(c.configMutators, mutator)
79+
}
80+
}
81+
7582
func withDAGsDir(dir string) fixtureOption {
7683
return func(c *fixtureConfig) { c.dagsDir = dir }
7784
}
@@ -164,6 +171,9 @@ func newTestFixture(t *testing.T, yaml string, opts ...fixtureOption) *testFixtu
164171
if cfg.staleHeartbeatThreshold > 0 || cfg.staleLeaseThreshold > 0 {
165172
coordOpts = append(coordOpts, test.WithStaleThresholds(cfg.staleHeartbeatThreshold, cfg.staleLeaseThreshold))
166173
}
174+
for _, mutate := range cfg.configMutators {
175+
coordOpts = append(coordOpts, test.WithConfigMutator(mutate))
176+
}
167177

168178
coord := test.SetupCoordinator(t, coordOpts...)
169179
coord.Config.Queues.Enabled = true
@@ -334,8 +344,10 @@ func (f *testFixture) startSchedulerWithOptions(
334344
}
335345

336346
schedulerCtx, schedulerCancel := f.schedulerCtx, f.schedulerCancel
347+
ownsSchedulerCtx := false
337348
if schedulerCtx == nil || schedulerCancel == nil {
338-
schedulerCtx, schedulerCancel = context.WithTimeout(f.coord.Context, startupTimeout)
349+
schedulerCtx, schedulerCancel = context.WithCancel(f.coord.Context)
350+
ownsSchedulerCtx = true
339351
}
340352
schedulerErrCh := make(chan error, 1)
341353

@@ -350,13 +362,36 @@ func (f *testFixture) startSchedulerWithOptions(
350362
}(schedulerInst, schedulerCtx, schedulerErrCh)
351363

352364
var startErr error
353-
require.Eventually(f.t, func() bool {
354-
if f.scheduler.IsRunning() {
355-
return true
356-
}
365+
startTicker := time.NewTicker(50 * time.Millisecond)
366+
defer startTicker.Stop()
367+
368+
startTimer := time.NewTimer(startupTimeout)
369+
defer startTimer.Stop()
370+
371+
for !f.scheduler.IsRunning() {
372+
357373
startErr = f.pollSchedulerErr()
358-
return startErr != nil
359-
}, startupTimeout, 50*time.Millisecond, "scheduler did not start in time")
374+
if startErr != nil {
375+
break
376+
}
377+
378+
select {
379+
case <-startTicker.C:
380+
case <-startTimer.C:
381+
if ownsSchedulerCtx && schedulerCancel != nil {
382+
schedulerCancel()
383+
require.Eventually(f.t, func() bool {
384+
startErr = f.pollSchedulerErr()
385+
return startErr != nil
386+
}, time.Second, 25*time.Millisecond, "scheduler startup did not stop after cancellation")
387+
}
388+
389+
if startErr != nil {
390+
require.FailNow(f.t, fmt.Sprintf("scheduler did not start in time: %v", startErr))
391+
}
392+
require.FailNow(f.t, "scheduler did not start in time")
393+
}
394+
}
360395
require.NoError(f.t, startErr)
361396
}
362397

Lines changed: 240 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,240 @@
1+
// Copyright (C) 2026 Yota Hamada
2+
// SPDX-License-Identifier: GPL-3.0-or-later
3+
4+
package distr_test
5+
6+
import (
7+
"strconv"
8+
"testing"
9+
"time"
10+
11+
"github.com/dagucloud/dagu/internal/cmn/config"
12+
"github.com/dagucloud/dagu/internal/core"
13+
"github.com/dagucloud/dagu/internal/core/exec"
14+
"github.com/stretchr/testify/require"
15+
)
16+
17+
func TestExecution_QueuedDispatch_ConsumesOneThousandItems(t *testing.T) {
18+
const (
19+
queueName = "bulk-q"
20+
totalRuns = 1000
21+
)
22+
23+
f := newTestFixture(t, `
24+
type: graph
25+
name: shared-nothing-bulk-queue-test
26+
queue: bulk-q
27+
worker_selector:
28+
tier: "queue"
29+
steps:
30+
- name: step1
31+
command: echo "executed"
32+
`,
33+
withWorkerCount(10),
34+
withLabels(map[string]string{"tier": "queue"}),
35+
withConfigMutator(func(c *config.Config) {
36+
c.Queues.Enabled = true
37+
c.Queues.Config = []config.QueueConfig{{
38+
Name: queueName,
39+
MaxActiveRuns: 100,
40+
}}
41+
}),
42+
)
43+
defer f.cleanup()
44+
45+
for range totalRuns {
46+
require.NoError(t, f.enqueue())
47+
}
48+
49+
requireQueuedItemCountEventually(t, f, totalRuns, 60*time.Second)
50+
51+
f.startScheduler(30 * time.Second)
52+
53+
requireAllQueuedRunsConsumed(t, f, totalRuns, 10*time.Minute)
54+
}
55+
56+
func TestExecution_QueuedDispatch_RecoversWhenWorkerRegistersLater(t *testing.T) {
57+
f := newTestFixture(t, `
58+
type: graph
59+
name: shared-nothing-late-worker-test
60+
worker_selector:
61+
tier: "queue"
62+
steps:
63+
- name: step1
64+
command: echo "executed"
65+
`, withWorkerCount(0))
66+
defer f.cleanup()
67+
68+
require.NoError(t, f.enqueue())
69+
f.waitForQueued()
70+
f.startScheduler(30 * time.Second)
71+
72+
requireQueuedRunStillPending(t, f, 2*time.Second)
73+
74+
f.setupSharedNothingWorker("late-worker", map[string]string{"tier": "queue"}, "")
75+
76+
status := f.waitForStatus(core.Succeeded, 20*time.Second)
77+
require.Equal(t, "late-worker", status.WorkerID)
78+
requireQueueEventuallyEmpty(t, f)
79+
}
80+
81+
func TestExecution_QueuedDispatch_RecoversWhenMatchingWorkerRegistersLater(t *testing.T) {
82+
f := newTestFixture(t, `
83+
type: graph
84+
name: shared-nothing-selector-recovery-test
85+
worker_selector:
86+
tier: "queue"
87+
steps:
88+
- name: step1
89+
command: echo "executed"
90+
`, withWorkerCount(0))
91+
defer f.cleanup()
92+
93+
f.setupSharedNothingWorker("mismatched-worker", map[string]string{"tier": "other"}, "")
94+
95+
require.NoError(t, f.enqueue())
96+
f.waitForQueued()
97+
f.startScheduler(30 * time.Second)
98+
99+
requireQueuedRunStillPending(t, f, 2*time.Second)
100+
101+
f.setupSharedNothingWorker("matching-worker", map[string]string{"tier": "queue"}, "")
102+
103+
status := f.waitForStatus(core.Succeeded, 20*time.Second)
104+
require.Equal(t, "matching-worker", status.WorkerID)
105+
requireQueueEventuallyEmpty(t, f)
106+
}
107+
108+
func requireQueuedRunStillPending(t *testing.T, f *testFixture, duration time.Duration) {
109+
t.Helper()
110+
111+
require.Eventually(t, func() bool {
112+
status, err := f.latestStatus()
113+
if err != nil {
114+
return false
115+
}
116+
if status.Status != core.Queued {
117+
return false
118+
}
119+
count, err := queuedItemCount(f)
120+
if err != nil {
121+
return false
122+
}
123+
return count == 1
124+
}, 10*time.Second, 100*time.Millisecond, "run should remain queued while no usable worker exists")
125+
126+
require.Never(t, func() bool {
127+
status, err := f.latestStatus()
128+
if err != nil {
129+
return false
130+
}
131+
count, err := queuedItemCount(f)
132+
if err != nil {
133+
return false
134+
}
135+
return status.Status != core.Queued || count != 1
136+
}, duration, 100*time.Millisecond, "queued run should not disappear before a matching worker is available")
137+
}
138+
139+
func requireQueueEventuallyEmpty(t *testing.T, f *testFixture) {
140+
t.Helper()
141+
142+
requireQueuedItemCountEventually(t, f, 0, 10*time.Second)
143+
}
144+
145+
func queuedItemCount(f *testFixture) (int, error) {
146+
items, err := f.coord.QueueStore.ListByDAGName(f.coord.Context, f.dagWrapper.ProcGroup(), f.dagWrapper.Name)
147+
if err != nil {
148+
return 0, err
149+
}
150+
return len(items), nil
151+
}
152+
153+
func requireQueuedItemCountEventually(t *testing.T, f *testFixture, expected int, timeout time.Duration) {
154+
t.Helper()
155+
156+
require.Eventually(t, func() bool {
157+
count, err := queuedItemCount(f)
158+
return err == nil && count == expected
159+
}, timeout, 100*time.Millisecond, "expected %d queued items", expected)
160+
}
161+
162+
func requireAllQueuedRunsConsumed(t *testing.T, f *testFixture, expectedRuns int, timeout time.Duration) {
163+
t.Helper()
164+
165+
deadline := time.Now().Add(timeout)
166+
lastSummary := "no status collected"
167+
for time.Now().Before(deadline) {
168+
counts, total, err := dagRunStatusCounts(f)
169+
if err != nil {
170+
lastSummary = err.Error()
171+
time.Sleep(time.Second)
172+
continue
173+
}
174+
queueCount, err := queuedItemCount(f)
175+
if err != nil {
176+
lastSummary = err.Error()
177+
time.Sleep(time.Second)
178+
continue
179+
}
180+
181+
lastSummary = formatStatusSummary(counts, total, queueCount)
182+
183+
if total == expectedRuns &&
184+
queueCount == 0 &&
185+
counts[core.Succeeded] == expectedRuns &&
186+
counts[core.Queued] == 0 &&
187+
counts[core.Running] == 0 &&
188+
counts[core.NotStarted] == 0 &&
189+
counts[core.Failed] == 0 &&
190+
counts[core.Aborted] == 0 &&
191+
counts[core.Waiting] == 0 &&
192+
counts[core.Rejected] == 0 &&
193+
counts[core.PartiallySucceeded] == 0 {
194+
return
195+
}
196+
197+
time.Sleep(time.Second)
198+
}
199+
200+
t.Fatalf("expected %d queued runs to be fully consumed; last state: %s", expectedRuns, lastSummary)
201+
}
202+
203+
func dagRunStatusCounts(f *testFixture) (map[core.Status]int, int, error) {
204+
statuses, err := f.coord.DAGRunStore.ListStatuses(
205+
f.coord.Context,
206+
exec.WithExactName(f.dagWrapper.Name),
207+
exec.WithoutLimit(),
208+
)
209+
if err != nil {
210+
return nil, 0, err
211+
}
212+
213+
counts := make(map[core.Status]int)
214+
for _, status := range statuses {
215+
if status == nil {
216+
continue
217+
}
218+
counts[status.Status]++
219+
}
220+
221+
return counts, len(statuses), nil
222+
}
223+
224+
func formatStatusSummary(counts map[core.Status]int, total, queueCount int) string {
225+
return "total=" + itoa(total) +
226+
" queue=" + itoa(queueCount) +
227+
" succeeded=" + itoa(counts[core.Succeeded]) +
228+
" queued=" + itoa(counts[core.Queued]) +
229+
" running=" + itoa(counts[core.Running]) +
230+
" not_started=" + itoa(counts[core.NotStarted]) +
231+
" failed=" + itoa(counts[core.Failed]) +
232+
" aborted=" + itoa(counts[core.Aborted]) +
233+
" waiting=" + itoa(counts[core.Waiting]) +
234+
" rejected=" + itoa(counts[core.Rejected]) +
235+
" partial=" + itoa(counts[core.PartiallySucceeded])
236+
}
237+
238+
func itoa(v int) string {
239+
return strconv.Itoa(v)
240+
}

internal/intg/distr/zombie_recovery_test.go

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -511,7 +511,13 @@ func startWorkerProcess(t *testing.T, f *testFixture, workerID, labels string) (
511511
}()
512512

513513
t.Cleanup(func() {
514-
if cmd.Process != nil && (cmd.ProcessState == nil || !cmd.ProcessState.Exited()) {
514+
select {
515+
case <-done:
516+
return
517+
default:
518+
}
519+
520+
if cmd.Process != nil {
515521
_ = cmdutil.KillProcessGroup(cmd, os.Kill)
516522
}
517523
select {

0 commit comments

Comments
 (0)