Skip to content

Commit 618ee9a

Browse files
authored
Merge pull request #656 from kibertoad/feat/perf
feat: Performance improvements
2 parents 499e8f2 + 519bafe commit 618ee9a

File tree

3 files changed

+104
-62
lines changed

3 files changed

+104
-62
lines changed

src/boss.ts

Lines changed: 34 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -62,22 +62,40 @@ class Boss extends EventEmitter implements types.EventsMixin {
6262
}
6363
}
6464

65-
async #executeSql (sql: string, values?: unknown[]) {
65+
async #executeSql (sql: string) {
6666
const started = Date.now()
6767

68-
const result = unwrapSQLResult(await this.#db.executeSql(sql, values))
68+
const result = unwrapSQLResult(await this.#db.executeSql(sql))
6969

70-
const ended = Date.now()
70+
const elapsed = (Date.now() - started) / 1000
7171

72-
const elapsed = (ended - started) / 1000
72+
if (
73+
elapsed > WARNINGS.SLOW_QUERY.seconds ||
74+
this.#config.__test__warn_slow_query
75+
) {
76+
this.emit(events.warning, {
77+
message: WARNINGS.SLOW_QUERY.message,
78+
data: { elapsed, sql },
79+
})
80+
}
81+
82+
return result
83+
}
84+
85+
async #executeQuery (query: plans.SqlQuery) {
86+
const started = Date.now()
87+
88+
const result = unwrapSQLResult(await this.#db.executeSql(query.text, query.values))
89+
90+
const elapsed = (Date.now() - started) / 1000
7391

7492
if (
7593
elapsed > WARNINGS.SLOW_QUERY.seconds ||
7694
this.#config.__test__warn_slow_query
7795
) {
7896
this.emit(events.warning, {
7997
message: WARNINGS.SLOW_QUERY.message,
80-
data: { elapsed, sql, values },
98+
data: { elapsed, sql: query.text, values: query.values },
8199
})
82100
}
83101

@@ -94,21 +112,19 @@ class Boss extends EventEmitter implements types.EventsMixin {
94112

95113
const queues = await this.#manager.getQueues()
96114

97-
for (const queue of queues) {
98-
!this.#stopped && (await this.supervise(queue))
99-
}
115+
!this.#stopped && (await this.supervise(queues))
100116
} catch (err) {
101117
this.emit(events.error, err)
102118
} finally {
103119
this.#maintaining = false
104120
}
105121
}
106122

107-
async supervise (value?: string | types.QueueResult) {
108-
let queues
123+
async supervise (value?: string | types.QueueResult[]) {
124+
let queues: types.QueueResult[]
109125

110-
if (typeof value === 'object') {
111-
queues = [value]
126+
if (Array.isArray(value)) {
127+
queues = value
112128
} else {
113129
queues = await this.#manager.getQueues(value)
114130
}
@@ -129,8 +145,10 @@ class Boss extends EventEmitter implements types.EventsMixin {
129145
while (names.length) {
130146
const chunk = names.splice(0, 100)
131147

132-
await this.#monitor(table, chunk)
133-
await this.#maintain(table, chunk)
148+
await Promise.all([
149+
this.#monitor(table, chunk),
150+
this.#maintain(table, chunk)
151+
])
134152
}
135153
}
136154
}
@@ -141,7 +159,7 @@ class Boss extends EventEmitter implements types.EventsMixin {
141159
names,
142160
this.#config.monitorIntervalSeconds
143161
)
144-
const { rows } = await this.#executeSql(command)
162+
const { rows } = await this.#executeQuery(command)
145163

146164
if (rows.length) {
147165
const queues = rows.map((q) => q.name)
@@ -168,7 +186,7 @@ class Boss extends EventEmitter implements types.EventsMixin {
168186
names,
169187
this.#config.maintenanceIntervalSeconds
170188
)
171-
const { rows } = await this.#executeSql(command)
189+
const { rows } = await this.#executeQuery(command)
172190

173191
if (rows.length) {
174192
const queues = rows.map((q) => q.name)

src/manager.ts

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -474,12 +474,12 @@ class Manager extends EventEmitter implements types.EventsMixin {
474474
ignoreSingletons: singletonsActive
475475
}
476476

477-
const sql = plans.fetchNextJob(fetchOptions)
477+
const query = plans.fetchNextJob(fetchOptions)
478478

479479
let result
480480

481481
try {
482-
result = await db.executeSql(sql)
482+
result = await db.executeSql(query.text, query.values)
483483
} catch (err) {
484484
// errors from fetchquery should only be unique constraint violations
485485
}
@@ -606,8 +606,8 @@ class Manager extends EventEmitter implements types.EventsMixin {
606606
}
607607
}
608608

609-
const sql = plans.getQueues(this.config.schema, names)
610-
const { rows } = await this.db.executeSql(sql)
609+
const query = plans.getQueues(this.config.schema, names)
610+
const { rows } = await this.db.executeSql(query.text, query.values)
611611
return rows
612612
}
613613

@@ -640,8 +640,8 @@ class Manager extends EventEmitter implements types.EventsMixin {
640640
async getQueue (name: string) {
641641
Attorney.assertQueueName(name)
642642

643-
const sql = plans.getQueues(this.config.schema, [name])
644-
const { rows } = await this.db.executeSql(sql)
643+
const query = plans.getQueues(this.config.schema, [name])
644+
const { rows } = await this.db.executeSql(query.text, query.values)
645645

646646
return rows[0] || null
647647
}
@@ -694,9 +694,9 @@ class Manager extends EventEmitter implements types.EventsMixin {
694694

695695
const queue = await this.getQueueCache(name)
696696

697-
const sql = plans.getQueueStats(this.config.schema, queue.table, [name])
697+
const query = plans.getQueueStats(this.config.schema, queue.table, [name])
698698

699-
const { rows } = await this.db.executeSql(sql)
699+
const { rows } = await this.db.executeSql(query.text, query.values)
700700

701701
return Object.assign(queue, rows.at(0) || {})
702702
}

src/plans.ts

Lines changed: 62 additions & 38 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,14 @@
11
import type { UpdateQueueOptions } from './types.ts'
22

3+
export interface SqlQuery {
4+
text: string
5+
values: unknown[]
6+
}
7+
38
const DEFAULT_SCHEMA = 'pgboss'
49
const MIGRATE_RACE_MESSAGE = 'division by zero'
510
const CREATE_RACE_MESSAGE = 'already exists'
11+
const SINGLE_QUOTE_REGEX = /'/g
612
const FIFTEEN_MINUTES = 60 * 15
713
const FORTEEN_DAYS = 60 * 60 * 24 * 14
814
const SEVEN_DAYS = 60 * 60 * 24 * 7
@@ -376,11 +382,11 @@ function createIndexJobPolicyExclusive (schema: string) {
376382
return `CREATE UNIQUE INDEX job_i6 ON ${schema}.job (name, COALESCE(singleton_key, '')) WHERE state <= '${JOB_STATES.active}' AND policy = '${QUEUE_POLICIES.exclusive}'`
377383
}
378384

379-
function trySetQueueMonitorTime (schema: string, queues: string[], seconds: number) {
385+
function trySetQueueMonitorTime (schema: string, queues: string[], seconds: number): SqlQuery {
380386
return trySetQueueTimestamp(schema, queues, 'monitor_on', seconds)
381387
}
382388

383-
function trySetQueueDeletionTime (schema: string, queues: string[], seconds: number) {
389+
function trySetQueueDeletionTime (schema: string, queues: string[], seconds: number): SqlQuery {
384390
return trySetQueueTimestamp(schema, queues, 'maintain_on', seconds)
385391
}
386392

@@ -397,14 +403,17 @@ function trySetTimestamp (schema: string, column: string, seconds: number) {
397403
`
398404
}
399405

400-
function trySetQueueTimestamp (schema: string, queues: string[], column: string, seconds: number) {
401-
return `
406+
function trySetQueueTimestamp (schema: string, queues: string[], column: string, seconds: number): SqlQuery {
407+
return {
408+
text: `
402409
UPDATE ${schema}.queue
403410
SET ${column} = now()
404-
WHERE name IN(${getQueueInClause(queues)})
411+
WHERE name = ANY($1::text[])
405412
AND EXTRACT( EPOCH FROM (now() - COALESCE(${column}, now() - interval '1 week') ) ) > ${seconds}
406-
RETURNING name
407-
`
413+
RETURNING name
414+
`,
415+
values: [queues]
416+
}
408417
}
409418

410419
function updateQueue (schema: string, { deadLetter }: UpdateQueueOptions = {}) {
@@ -432,9 +441,11 @@ function updateQueue (schema: string, { deadLetter }: UpdateQueueOptions = {}) {
432441
`
433442
}
434443

435-
function getQueues (schema: string, names?: string[]) {
436-
return `
437-
SELECT
444+
function getQueues (schema: string, names?: string[]): SqlQuery {
445+
const hasNames = names && names.length > 0
446+
return {
447+
text: `
448+
SELECT
438449
q.name,
439450
q.policy,
440451
q.retry_limit as "retryLimit",
@@ -456,8 +467,10 @@ function getQueues (schema: string, names?: string[]) {
456467
q.created_on as "createdOn",
457468
q.updated_on as "updatedOn"
458469
FROM ${schema}.queue q
459-
${names ? `WHERE q.name IN (${names.map(i => `'${i}'`)})` : ''}
460-
`
470+
${hasNames ? 'WHERE q.name = ANY($1::text[])' : ''}
471+
`,
472+
values: hasNames ? [names] : []
473+
}
461474
}
462475

463476
function deleteJobsById (schema: string, table: string) {
@@ -574,18 +587,20 @@ interface FetchJobOptions {
574587
ignoreSingletons: string[] | null
575588
}
576589

577-
function fetchNextJob ({ schema, table, name, policy, limit, includeMetadata, priority = true, ignoreStartAfter = false, ignoreSingletons = null }: FetchJobOptions) {
590+
function fetchNextJob ({ schema, table, name, policy, limit, includeMetadata, priority = true, ignoreStartAfter = false, ignoreSingletons = null }: FetchJobOptions): SqlQuery {
578591
const singletonFetch = limit > 1 && (policy === QUEUE_POLICIES.singleton || policy === QUEUE_POLICIES.stately)
579592
const cte = singletonFetch ? 'grouped' : 'next'
593+
const hasIgnoreSingletons = ignoreSingletons != null && ignoreSingletons.length > 0
580594

581-
return `
595+
return {
596+
text: `
582597
WITH next as (
583598
SELECT id ${singletonFetch ? ', singleton_key' : ''}
584599
FROM ${schema}.${table}
585600
WHERE name = '${name}'
586601
AND state < '${JOB_STATES.active}'
587602
${ignoreStartAfter ? '' : 'AND start_after < now()'}
588-
${ignoreSingletons != null && ignoreSingletons?.length > 0 ? `AND singleton_key NOT IN (${ignoreSingletons.map(i => `'${i}'`).join()})` : ''}
603+
${hasIgnoreSingletons ? 'AND singleton_key <> ALL($1::text[])' : ''}
589604
ORDER BY ${priority ? 'priority desc, ' : ''}created_on, id
590605
LIMIT ${limit}
591606
FOR UPDATE SKIP LOCKED
@@ -598,8 +613,10 @@ function fetchNextJob ({ schema, table, name, policy, limit, includeMetadata, pr
598613
FROM ${cte}
599614
WHERE name = '${name}' AND j.id = ${cte}.id
600615
${singletonFetch ? ` AND ${cte}.row_number = 1` : ''}
601-
RETURNING j.${includeMetadata ? JOB_COLUMNS_ALL : JOB_COLUMNS_MIN}
602-
`
616+
RETURNING j.${includeMetadata ? JOB_COLUMNS_ALL : JOB_COLUMNS_MIN}
617+
`,
618+
values: hasIgnoreSingletons ? [ignoreSingletons] : []
619+
}
603620
}
604621

605622
function completeJobs (schema: string, table: string) {
@@ -732,10 +749,10 @@ function failJobsById (schema: string, table: string) {
732749
return failJobs(schema, table, where, output)
733750
}
734751

735-
function failJobsByTimeout (schema: string, table: string, queues: string[]) {
752+
function failJobsByTimeout (schema: string, table: string, queues: string[]): string {
736753
const where = `state = '${JOB_STATES.active}'
737754
AND (started_on + expire_seconds * interval '1s') < now()
738-
AND name IN (${getQueueInClause(queues)})`
755+
AND name = ANY(${serializeArrayParam(queues)})`
739756

740757
const output = '\'{ "value": { "message": "job timed out" } }\'::jsonb'
741758

@@ -889,16 +906,16 @@ function failJobs (schema: string, table: string, where: string, output: string)
889906
`
890907
}
891908

892-
function deletion (schema: string, table: string, queues: string[]) {
909+
function deletion (schema: string, table: string, queues: string[]): string {
893910
const sql = `
894911
DELETE FROM ${schema}.${table}
895-
WHERE name IN (${getQueueInClause(queues)})
912+
WHERE name = ANY(${serializeArrayParam(queues)})
896913
AND
897914
(
898915
completed_on + deletion_seconds * interval '1s' < now()
899916
OR
900917
(state < '${JOB_STATES.active}' AND keep_until < now())
901-
)
918+
)
902919
`
903920

904921
return locked(schema, sql, table + 'deletion')
@@ -919,24 +936,31 @@ function retryJobs (schema: string, table: string) {
919936
`
920937
}
921938

922-
function getQueueStats (schema: string, table: string, queues: string[]) {
923-
return `
939+
function getQueueStats (schema: string, table: string, queues: string[]): SqlQuery {
940+
return {
941+
text: `
924942
SELECT
925-
name,
943+
name,
926944
(count(*) FILTER (WHERE start_after > now()))::int as "deferredCount",
927945
(count(*) FILTER (WHERE state < '${JOB_STATES.active}'))::int as "queuedCount",
928946
(count(*) FILTER (WHERE state = '${JOB_STATES.active}'))::int as "activeCount",
929947
count(*)::int as "totalCount",
930948
array_agg(singleton_key) FILTER (WHERE policy IN ('${QUEUE_POLICIES.singleton}','${QUEUE_POLICIES.stately}') AND state = '${JOB_STATES.active}') as "singletonsActive"
931949
FROM ${schema}.${table}
932-
WHERE name IN (${getQueueInClause(queues)})
950+
WHERE name = ANY($1::text[])
933951
GROUP BY 1
934-
`
952+
`,
953+
values: [queues]
954+
}
935955
}
936956

937-
function cacheQueueStats (schema: string, table: string, queues: string[]) {
957+
function cacheQueueStats (schema: string, table: string, queues: string[]): string {
958+
const statsQuery = getQueueStats(schema, table, queues)
959+
// Serialize the $1 parameter for use in locked() multi-statement query
960+
const statsText = statsQuery.text.replace('$1::text[]', serializeArrayParam(queues))
961+
938962
const sql = `
939-
WITH stats AS (${getQueueStats(schema, table, queues)})
963+
WITH stats AS (${statsText})
940964
UPDATE ${schema}.queue SET
941965
deferred_count = "deferredCount",
942966
queued_count = "queuedCount",
@@ -954,17 +978,21 @@ function cacheQueueStats (schema: string, table: string, queues: string[]) {
954978
return locked(schema, sql, 'queue-stats')
955979
}
956980

957-
function locked (schema: string, query: string | string[], key?: string) {
958-
if (Array.isArray(query)) {
959-
query = query.join(';\n')
960-
}
981+
// Serialize a string array for embedding directly in SQL as PostgreSQL array literal
982+
function serializeArrayParam (values: string[]): string {
983+
const escaped = values.map(v => `'${v.replace(SINGLE_QUOTE_REGEX, "''")}'`)
984+
return `ARRAY[${escaped.join(',')}]::text[]`
985+
}
986+
987+
function locked (schema: string, query: string | string[], key?: string): string {
988+
const sql = Array.isArray(query) ? query.join(';\n') : query
961989

962990
return `
963991
BEGIN;
964992
SET LOCAL lock_timeout = 30000;
965993
SET LOCAL idle_in_transaction_session_timeout = 30000;
966994
${advisoryLock(schema, key)};
967-
${query};
995+
${sql};
968996
COMMIT;
969997
`
970998
}
@@ -984,10 +1012,6 @@ function getJobById (schema: string, table: string) {
9841012
return `SELECT ${JOB_COLUMNS_ALL} FROM ${schema}.${table} WHERE name = $1 AND id = $2`
9851013
}
9861014

987-
function getQueueInClause (queues: string[]) {
988-
return queues.map(i => `'${i}'`).join(',')
989-
}
990-
9911015
export {
9921016
create,
9931017
insertVersion,

0 commit comments

Comments
 (0)