Skip to content

Commit 6fc6d6f

Browse files
committed
fix(scheduler): update scheduled jobs mgmt (V4-1288)
1 parent c2d8098 commit 6fc6d6f

File tree

5 files changed

+55
-131
lines changed

5 files changed

+55
-131
lines changed

apps/api/src/http/routers/admin/task.router.ts

Lines changed: 5 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -41,7 +41,7 @@ export function task() {
4141
await uniqueMiddleware(body.name, { req });
4242

4343
const task = await Task.create(body);
44-
await req.scope.cradle.scheduler.tasks.addJob(task);
44+
await req.scope.cradle.scheduler.tasks.updateTaskInQueue(task);
4545

4646
return { status: 201, body: task };
4747
},
@@ -53,7 +53,7 @@ export function task() {
5353
if (!task)
5454
throw new NotFoundError();
5555

56-
const bullJob = await req.scope.cradle.scheduler.tasks.getRepeatableJobById(taskId);
56+
const bullJob = await req.scope.cradle.scheduler.tasks.getScheduledJobById(taskId);
5757

5858
return { status: 200, body: { ...task.get(), bullJob } };
5959
},
@@ -68,9 +68,8 @@ export function task() {
6868
throw new NotFoundError();
6969

7070
await task.update(body);
71-
await req.scope.cradle.scheduler.tasks.updateJob(task);
72-
73-
const bullJob = await req.scope.cradle.scheduler.tasks.getRepeatableJobById(taskId);
71+
await req.scope.cradle.scheduler.tasks.updateTaskInQueue(task);
72+
const bullJob = await req.scope.cradle.scheduler.tasks.getScheduledJobById(taskId);
7473

7574
return { status: 200, body: { ...task.get(), bullJob } };
7675
},
@@ -82,8 +81,8 @@ export function task() {
8281
if (!task)
8382
throw new NotFoundError();
8483

84+
await req.scope.cradle.scheduler.tasks.removeTaskFromQueue(task);
8585
await task.destroy();
86-
await req.scope.cradle.scheduler.tasks.removeJob(task);
8786

8887
return { status: 204, body: undefined };
8988
},

apps/api/src/jobs/job.ts

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,7 @@ export default abstract class Job<T extends JobType> {
2626
this.logger = logger;
2727
}
2828

29-
protected init(job: BullJob<JobData<JobParams[T]>>): void {
29+
protected init(job: BullJob<JobData<T>>): void {
3030
const {
3131
id,
3232
data: { params },
@@ -48,11 +48,11 @@ export default abstract class Job<T extends JobType> {
4848
* To be implemented by each job
4949
*
5050
* @abstract
51-
* @param {BullJob<JobData<JobParams[T]>>} job
51+
* @param {BullJob<JobData<T>>} job
5252
* @returns {Promise<void>}
5353
* @memberof Job
5454
*/
55-
abstract run(job: BullJob<JobData<JobParams[T]>>): Promise<void>;
55+
abstract run(job: BullJob<JobData<T>>): Promise<void>;
5656

5757
/**
5858
* Get current progress

apps/api/src/services/core/queues/jobs-queue-handler.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -198,7 +198,7 @@ export default class JobsQueueHandler extends QueueHandler<JobData> {
198198
private async queueJob(job: DbJob, options: JobsOptions = {}) {
199199
const { id, type, params } = job;
200200

201-
const bullJob = await this.queue.add(type, { params }, { ...options, jobId: `db-${id}` });
201+
const bullJob = await this.queue.add(type, { type, params }, { ...options, jobId: `db-${id}` });
202202

203203
this.logger.debug(`Queue ${this.name}: Job ${id} | ${type} queued.`);
204204

apps/api/src/services/core/queues/tasks-queue-handler.ts

Lines changed: 45 additions & 120 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,6 @@ import type { IoC } from '@intake24/api/ioc';
55
import type { Job } from '@intake24/api/jobs';
66
import type { JobData } from '@intake24/common/types';
77
import ioc from '@intake24/api/ioc';
8-
import { sleep } from '@intake24/api/util';
98
import { Task } from '@intake24/db';
109

1110
import { QueueHandler } from './queue-handler';
@@ -63,172 +62,98 @@ export default class TasksQueueHandler extends QueueHandler<JobData> {
6362

6463
this.workers.push(worker);
6564

66-
await this.clearRepeatableJobs();
67-
await this.loadRepeatableJobs();
65+
await this.clearScheduledJobs();
66+
await this.loadScheduledJobs();
6867

6968
this.logger.info(`${this.name} has been loaded.`);
7069
}
7170

7271
async processor(job: BullJob<JobData>) {
73-
const { id, name } = job;
72+
const { id, data: { type } } = job;
7473

7574
if (!id) {
7675
this.logger.error(`Queue ${this.name}: Job ID missing.`);
7776
return;
7877
}
7978

80-
const newJob = ioc.resolve<Job<any>>(name);
79+
const newJob = ioc.resolve<Job<typeof type>>(type);
8180
await newJob.run(job);
8281
}
8382

84-
async getRepeatableJobById(id: string) {
85-
const jobs = await this.queue.getRepeatableJobs();
83+
private async clearScheduledJobs() {
84+
const repeatableJobs = await this.queue.getJobSchedulers();
8685

87-
return jobs.find(job => job.id?.replace('db-', '') === id);
86+
await Promise.all(repeatableJobs.map(job => this.queue.removeJobScheduler(job.key)));
8887
}
8988

90-
/**
91-
* Remove repeatable job(s) from queue
92-
*
93-
* @private
94-
* @param {string} [id]
95-
* @memberof TasksQueueHandler
96-
*/
97-
private async clearRepeatableJobs(id?: string) {
98-
const repeatableJobs = await this.queue.getRepeatableJobs();
89+
private createJobParams(task: Task) {
90+
const { name, job, cron, params } = task;
9991

100-
for (const job of repeatableJobs) {
101-
if (id && job.id?.replace('db-', '') !== id)
102-
continue;
103-
104-
await this.queue.removeRepeatableByKey(job.key);
105-
}
92+
return {
93+
name,
94+
data: { type: job, params },
95+
opts: { pattern: cron },
96+
};
10697
}
10798

108-
/**
109-
* Load repeatable jobs from DB to the queue
110-
*
111-
* @private
112-
* @memberof TasksQueueHandler
113-
*/
114-
private async loadRepeatableJobs() {
99+
private async loadScheduledJobs() {
115100
const tasks = await Task.findAll({ where: { active: true } });
116101

117-
for (const task of tasks)
118-
await this.addJob(task);
102+
for (const task of tasks) {
103+
const { name, data, opts } = this.createJobParams(task);
104+
await this.queue.upsertJobScheduler(`db-${task.id}`, opts, { name, data });
105+
}
119106
}
120107

121-
/**
122-
* Push job into the queue
123-
*
124-
* @private
125-
* @param {Task} task
126-
* @memberof TasksQueueHandler
127-
*/
128-
private async queueJob(task: Task) {
129-
const { id, job, cron, params } = task;
108+
private async getScheduledJobByKey(key: string) {
109+
const jobs = await this.queue.getJobSchedulers();
130110

131-
return await this.queue.add(job, { params }, { repeat: { pattern: cron }, jobId: `db-${id}` });
111+
return jobs.find(job => job.key === key);
132112
}
133113

134-
/**
135-
* Remove job from queue
136-
*
137-
* @private
138-
* @param {Task} task
139-
* @memberof TasksQueueHandler
140-
*/
141-
private async dequeueJob(task: Task) {
142-
this.clearRepeatableJobs(task.id);
114+
async getScheduledJobById(id: string) {
115+
return await this.getScheduledJobByKey(`db-${id}`);
143116
}
144117

145-
/**
146-
* Add task's job to queue
147-
*
148-
* @param {Task} task
149-
* @memberof TasksQueueHandler
150-
*/
151-
public async addJob(task: Task) {
152-
const { id, name, active } = task;
153-
154-
if (!active) {
155-
this.logger.warn(`Queue ${this.name}: Task (ID: ${id}, Name: ${name}) not set as active.`);
156-
return;
118+
private async removeScheduledJobById(id: string) {
119+
const job = await this.getScheduledJobById(id);
120+
if (!job) {
121+
this.logger.debug(`Queue ${this.name}: Scheduled task (ID: ${id}) not in queue.`);
122+
return undefined;
157123
}
158124

159-
await this.queueJob(task);
160-
161-
this.logger.debug(`Queue ${this.name}: Task (ID: ${id}, Name: ${name}) added.`);
125+
return await this.queue.removeJobScheduler(job.key);
162126
}
163127

164-
/**
165-
* Update task's job to queue
166-
*
167-
* @param {Task} task
168-
* @memberof TasksQueueHandler
169-
*/
170-
public async updateJob(task: Task) {
171-
const { id, name, active } = task;
128+
public async updateTaskInQueue(task: Task) {
129+
const { id, active } = task;
172130

173-
await this.dequeueJob(task);
174-
175-
/*
176-
* Bullmq bug
177-
* When repeatable job removed right away and new job pushed in, job entry doesn't end up in redis store
178-
* Though queue.add returns correct job entry
179-
* Workaround: simple sleep/wait for few ms solves it for now
180-
*/
181-
await sleep(20);
131+
if (!active) {
132+
await this.removeScheduledJobById(id);
133+
return;
134+
}
182135

183-
if (active)
184-
await this.queueJob(task);
136+
const { name, data, opts } = this.createJobParams(task);
137+
const job = await this.queue.upsertJobScheduler(`db-${id}`, opts, { name, data });
185138

186139
this.logger.debug(`Queue ${this.name}: Task (ID: ${id}, Name: ${name}) updated.`);
187-
}
188-
189-
/**
190-
* Remove task's job from queue
191-
*
192-
* @param {Task} task
193-
* @memberof TasksQueueHandler
194-
*/
195-
public async removeJob(task: Task) {
196-
const { id, name } = task;
197-
198-
await this.dequeueJob(task);
199140

200-
this.logger.debug(`Queue ${this.name}: Task (ID: ${id}, Name: ${name}) removed.`);
141+
return job;
201142
}
202143

203-
/**
204-
* Add task's job to queue
205-
*
206-
* @param {Task} task
207-
* @memberof TasksQueueHandler
208-
*/
209-
public async runJob(task: Task) {
210-
const { id, name, job, params } = task;
144+
public async removeTaskFromQueue(task: Task) {
145+
const { id, name } = task;
211146

212-
await this.queue.add(job, { params }, { delay: 500 });
147+
await this.removeScheduledJobById(id);
213148

214-
this.logger.debug(`Queue ${this.name}: Task (ID: ${id}, Name: ${name}) queued.`);
149+
this.logger.debug(`Queue ${this.name}: Task (ID: ${id}, Name: ${name}) updated.`);
215150
}
216151

217-
/**
218-
* Pause all scheduled tasks in queue
219-
*
220-
* @memberof TasksQueueHandler
221-
*/
222152
public async pauseAll() {
223-
await this.clearRepeatableJobs();
153+
await this.clearScheduledJobs();
224154
}
225155

226-
/**
227-
* Resume all scheduled tasks in queue
228-
*
229-
* @memberof TasksQueueHandler
230-
*/
231156
public async resumeAll() {
232-
await this.loadRepeatableJobs();
157+
await this.loadScheduledJobs();
233158
}
234159
}

packages/common/src/types/jobs.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@ export const repeatableBullJob = z.object({
1919

2020
export type RepeatableBullJob = z.infer<typeof repeatableBullJob>;
2121

22-
export type JobData<T = any> = { params: T };
22+
export type JobData<T extends JobType = JobType> = { type: T; params: JobParams[T] };
2323

2424
export const redisStoreTypes = ['cache', 'rateLimiter', 'session'] as const;
2525
export type RedisStoreType = (typeof redisStoreTypes)[number];

0 commit comments

Comments
 (0)