Skip to content
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.

Commit d5ed413

Browse files
committedJan 15, 2025··
feat(NODE-6258): add signal support to cursor APIs
1 parent e2aa15c commit d5ed413

File tree

24 files changed

+1235
-123
lines changed

24 files changed

+1235
-123
lines changed
 

‎src/client-side-encryption/auto_encrypter.ts

Lines changed: 12 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@ import { kDecorateResult } from '../constants';
1111
import { getMongoDBClientEncryption } from '../deps';
1212
import { MongoRuntimeError } from '../error';
1313
import { MongoClient, type MongoClientOptions } from '../mongo_client';
14+
import { type Abortable } from '../mongo_types';
1415
import { MongoDBCollectionNamespace } from '../utils';
1516
import { autoSelectSocketOptions } from './client_encryption';
1617
import * as cryptoCallbacks from './crypto_callbacks';
@@ -372,8 +373,10 @@ export class AutoEncrypter {
372373
async encrypt(
373374
ns: string,
374375
cmd: Document,
375-
options: CommandOptions = {}
376+
options: CommandOptions & Abortable = {}
376377
): Promise<Document | Uint8Array> {
378+
options.signal?.throwIfAborted();
379+
377380
if (this._bypassEncryption) {
378381
// If `bypassAutoEncryption` has been specified, don't encrypt
379382
return cmd;
@@ -398,7 +401,7 @@ export class AutoEncrypter {
398401
socketOptions: autoSelectSocketOptions(this._client.s.options)
399402
});
400403

401-
return deserialize(await stateMachine.execute(this, context, options.timeoutContext), {
404+
return deserialize(await stateMachine.execute(this, context, options), {
402405
promoteValues: false,
403406
promoteLongs: false
404407
});
@@ -407,7 +410,12 @@ export class AutoEncrypter {
407410
/**
408411
* Decrypt a command response
409412
*/
410-
async decrypt(response: Uint8Array, options: CommandOptions = {}): Promise<Uint8Array> {
413+
async decrypt(
414+
response: Uint8Array,
415+
options: CommandOptions & Abortable = {}
416+
): Promise<Uint8Array> {
417+
options.signal?.throwIfAborted();
418+
411419
const context = this._mongocrypt.makeDecryptionContext(response);
412420

413421
context.id = this._contextCounter++;
@@ -419,11 +427,7 @@ export class AutoEncrypter {
419427
socketOptions: autoSelectSocketOptions(this._client.s.options)
420428
});
421429

422-
return await stateMachine.execute(
423-
this,
424-
context,
425-
options.timeoutContext?.csotEnabled() ? options.timeoutContext : undefined
426-
);
430+
return await stateMachine.execute(this, context, options);
427431
}
428432

429433
/**

‎src/client-side-encryption/client_encryption.ts

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -225,7 +225,7 @@ export class ClientEncryption {
225225
TimeoutContext.create(resolveTimeoutOptions(this._client, { timeoutMS: this._timeoutMS }));
226226

227227
const dataKey = deserialize(
228-
await stateMachine.execute(this, context, timeoutContext)
228+
await stateMachine.execute(this, context, { timeoutContext })
229229
) as DataKey;
230230

231231
const { db: dbName, collection: collectionName } = MongoDBCollectionNamespace.fromString(
@@ -293,7 +293,9 @@ export class ClientEncryption {
293293
resolveTimeoutOptions(this._client, { timeoutMS: this._timeoutMS })
294294
);
295295

296-
const { v: dataKeys } = deserialize(await stateMachine.execute(this, context, timeoutContext));
296+
const { v: dataKeys } = deserialize(
297+
await stateMachine.execute(this, context, { timeoutContext })
298+
);
297299
if (dataKeys.length === 0) {
298300
return {};
299301
}
@@ -696,7 +698,7 @@ export class ClientEncryption {
696698
? TimeoutContext.create(resolveTimeoutOptions(this._client, { timeoutMS: this._timeoutMS }))
697699
: undefined;
698700

699-
const { v } = deserialize(await stateMachine.execute(this, context, timeoutContext));
701+
const { v } = deserialize(await stateMachine.execute(this, context, { timeoutContext }));
700702

701703
return v;
702704
}
@@ -780,7 +782,7 @@ export class ClientEncryption {
780782
this._timeoutMS != null
781783
? TimeoutContext.create(resolveTimeoutOptions(this._client, { timeoutMS: this._timeoutMS }))
782784
: undefined;
783-
const { v } = deserialize(await stateMachine.execute(this, context, timeoutContext));
785+
const { v } = deserialize(await stateMachine.execute(this, context, { timeoutContext }));
784786
return v;
785787
}
786788
}

‎src/client-side-encryption/state_machine.ts

Lines changed: 80 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -15,8 +15,15 @@ import { CursorTimeoutContext } from '../cursor/abstract_cursor';
1515
import { getSocks, type SocksLib } from '../deps';
1616
import { MongoOperationTimeoutError } from '../error';
1717
import { type MongoClient, type MongoClientOptions } from '../mongo_client';
18+
import { type Abortable } from '../mongo_types';
1819
import { Timeout, type TimeoutContext, TimeoutError } from '../timeout';
19-
import { BufferPool, MongoDBCollectionNamespace, promiseWithResolvers } from '../utils';
20+
import {
21+
addAbortListener,
22+
BufferPool,
23+
kDispose,
24+
MongoDBCollectionNamespace,
25+
promiseWithResolvers
26+
} from '../utils';
2027
import { autoSelectSocketOptions, type DataKey } from './client_encryption';
2128
import { MongoCryptError } from './errors';
2229
import { type MongocryptdManager } from './mongocryptd_manager';
@@ -189,7 +196,7 @@ export class StateMachine {
189196
async execute(
190197
executor: StateMachineExecutable,
191198
context: MongoCryptContext,
192-
timeoutContext?: TimeoutContext
199+
options: { timeoutContext?: TimeoutContext } & Abortable
193200
): Promise<Uint8Array> {
194201
const keyVaultNamespace = executor._keyVaultNamespace;
195202
const keyVaultClient = executor._keyVaultClient;
@@ -199,6 +206,7 @@ export class StateMachine {
199206
let result: Uint8Array | null = null;
200207

201208
while (context.state !== MONGOCRYPT_CTX_DONE && context.state !== MONGOCRYPT_CTX_ERROR) {
209+
options.signal?.throwIfAborted();
202210
debug(`[context#${context.id}] ${stateToString.get(context.state) || context.state}`);
203211

204212
switch (context.state) {
@@ -214,7 +222,7 @@ export class StateMachine {
214222
metaDataClient,
215223
context.ns,
216224
filter,
217-
timeoutContext
225+
options
218226
);
219227
if (collInfo) {
220228
context.addMongoOperationResponse(collInfo);
@@ -235,9 +243,9 @@ export class StateMachine {
235243
// When we are using the shared library, we don't have a mongocryptd manager.
236244
const markedCommand: Uint8Array = mongocryptdManager
237245
? await mongocryptdManager.withRespawn(
238-
this.markCommand.bind(this, mongocryptdClient, context.ns, command, timeoutContext)
246+
this.markCommand.bind(this, mongocryptdClient, context.ns, command, options)
239247
)
240-
: await this.markCommand(mongocryptdClient, context.ns, command, timeoutContext);
248+
: await this.markCommand(mongocryptdClient, context.ns, command, options);
241249

242250
context.addMongoOperationResponse(markedCommand);
243251
context.finishMongoOperation();
@@ -246,12 +254,7 @@ export class StateMachine {
246254

247255
case MONGOCRYPT_CTX_NEED_MONGO_KEYS: {
248256
const filter = context.nextMongoOperation();
249-
const keys = await this.fetchKeys(
250-
keyVaultClient,
251-
keyVaultNamespace,
252-
filter,
253-
timeoutContext
254-
);
257+
const keys = await this.fetchKeys(keyVaultClient, keyVaultNamespace, filter, options);
255258

256259
if (keys.length === 0) {
257260
// See docs on EMPTY_V
@@ -273,7 +276,7 @@ export class StateMachine {
273276
}
274277

275278
case MONGOCRYPT_CTX_NEED_KMS: {
276-
await Promise.all(this.requests(context, timeoutContext));
279+
await Promise.all(this.requests(context, options));
277280
context.finishKMSRequests();
278281
break;
279282
}
@@ -315,11 +318,13 @@ export class StateMachine {
315318
* @param kmsContext - A C++ KMS context returned from the bindings
316319
* @returns A promise that resolves when the KMS reply has be fully parsed
317320
*/
318-
async kmsRequest(request: MongoCryptKMSRequest, timeoutContext?: TimeoutContext): Promise<void> {
321+
async kmsRequest(
322+
request: MongoCryptKMSRequest,
323+
options?: { timeoutContext?: TimeoutContext } & Abortable
324+
): Promise<void> {
319325
const parsedUrl = request.endpoint.split(':');
320326
const port = parsedUrl[1] != null ? Number.parseInt(parsedUrl[1], 10) : HTTPS_PORT;
321-
const socketOptions = autoSelectSocketOptions(this.options.socketOptions || {});
322-
const options: tls.ConnectionOptions & {
327+
const socketOptions: tls.ConnectionOptions & {
323328
host: string;
324329
port: number;
325330
autoSelectFamily?: boolean;
@@ -328,7 +333,7 @@ export class StateMachine {
328333
host: parsedUrl[0],
329334
servername: parsedUrl[0],
330335
port,
331-
...socketOptions
336+
...autoSelectSocketOptions(this.options.socketOptions || {})
332337
};
333338
const message = request.message;
334339
const buffer = new BufferPool();
@@ -363,7 +368,7 @@ export class StateMachine {
363368
throw error;
364369
}
365370
try {
366-
await this.setTlsOptions(providerTlsOptions, options);
371+
await this.setTlsOptions(providerTlsOptions, socketOptions);
367372
} catch (err) {
368373
throw onerror(err);
369374
}
@@ -380,23 +385,25 @@ export class StateMachine {
380385
.once('close', () => rejectOnNetSocketError(onclose()))
381386
.once('connect', () => resolveOnNetSocketConnect());
382387

388+
let abortListener;
389+
383390
try {
384391
if (this.options.proxyOptions && this.options.proxyOptions.proxyHost) {
385392
const netSocketOptions = {
393+
...socketOptions,
386394
host: this.options.proxyOptions.proxyHost,
387-
port: this.options.proxyOptions.proxyPort || 1080,
388-
...socketOptions
395+
port: this.options.proxyOptions.proxyPort || 1080
389396
};
390397
netSocket.connect(netSocketOptions);
391398
await willConnect;
392399

393400
try {
394401
socks ??= loadSocks();
395-
options.socket = (
402+
socketOptions.socket = (
396403
await socks.SocksClient.createConnection({
397404
existing_socket: netSocket,
398405
command: 'connect',
399-
destination: { host: options.host, port: options.port },
406+
destination: { host: socketOptions.host, port: socketOptions.port },
400407
proxy: {
401408
// host and port are ignored because we pass existing_socket
402409
host: 'iLoveJavaScript',
@@ -412,7 +419,7 @@ export class StateMachine {
412419
}
413420
}
414421

415-
socket = tls.connect(options, () => {
422+
socket = tls.connect(socketOptions, () => {
416423
socket.write(message);
417424
});
418425

@@ -422,6 +429,11 @@ export class StateMachine {
422429
resolve
423430
} = promiseWithResolvers<void>();
424431

432+
abortListener = addAbortListener(options?.signal, function () {
433+
destroySockets();
434+
rejectOnTlsSocketError(this.reason);
435+
});
436+
425437
socket
426438
.once('error', err => rejectOnTlsSocketError(onerror(err)))
427439
.once('close', () => rejectOnTlsSocketError(onclose()))
@@ -436,8 +448,11 @@ export class StateMachine {
436448
resolve();
437449
}
438450
});
439-
await (timeoutContext?.csotEnabled()
440-
? Promise.all([willResolveKmsRequest, Timeout.expires(timeoutContext?.remainingTimeMS)])
451+
await (options?.timeoutContext?.csotEnabled()
452+
? Promise.all([
453+
willResolveKmsRequest,
454+
Timeout.expires(options.timeoutContext?.remainingTimeMS)
455+
])
441456
: willResolveKmsRequest);
442457
} catch (error) {
443458
if (error instanceof TimeoutError)
@@ -446,16 +461,17 @@ export class StateMachine {
446461
} finally {
447462
// There's no need for any more activity on this socket at this point.
448463
destroySockets();
464+
abortListener?.[kDispose]();
449465
}
450466
}
451467

452-
*requests(context: MongoCryptContext, timeoutContext?: TimeoutContext) {
468+
*requests(context: MongoCryptContext, options?: { timeoutContext?: TimeoutContext } & Abortable) {
453469
for (
454470
let request = context.nextKMSRequest();
455471
request != null;
456472
request = context.nextKMSRequest()
457473
) {
458-
yield this.kmsRequest(request, timeoutContext);
474+
yield this.kmsRequest(request, options);
459475
}
460476
}
461477

@@ -516,14 +532,16 @@ export class StateMachine {
516532
client: MongoClient,
517533
ns: string,
518534
filter: Document,
519-
timeoutContext?: TimeoutContext
535+
options?: { timeoutContext?: TimeoutContext } & Abortable
520536
): Promise<Uint8Array | null> {
521537
const { db } = MongoDBCollectionNamespace.fromString(ns);
522538

523539
const cursor = client.db(db).listCollections(filter, {
524540
promoteLongs: false,
525541
promoteValues: false,
526-
timeoutContext: timeoutContext && new CursorTimeoutContext(timeoutContext, Symbol())
542+
timeoutContext:
543+
options?.timeoutContext && new CursorTimeoutContext(options?.timeoutContext, Symbol()),
544+
signal: options?.signal
527545
});
528546

529547
// There is always exactly zero or one matching documents, so this should always exhaust the cursor
@@ -547,17 +565,30 @@ export class StateMachine {
547565
client: MongoClient,
548566
ns: string,
549567
command: Uint8Array,
550-
timeoutContext?: TimeoutContext
568+
options?: { timeoutContext?: TimeoutContext } & Abortable
551569
): Promise<Uint8Array> {
552570
const { db } = MongoDBCollectionNamespace.fromString(ns);
553571
const bsonOptions = { promoteLongs: false, promoteValues: false };
554572
const rawCommand = deserialize(command, bsonOptions);
555573

574+
const commandOptions: {
575+
timeoutMS?: number;
576+
signal?: AbortSignal;
577+
} = {
578+
timeoutMS: undefined,
579+
signal: undefined
580+
};
581+
582+
if (options?.timeoutContext?.csotEnabled()) {
583+
commandOptions.timeoutMS = options.timeoutContext.remainingTimeMS;
584+
}
585+
if (options?.signal) {
586+
commandOptions.signal = options.signal;
587+
}
588+
556589
const response = await client.db(db).command(rawCommand, {
557590
...bsonOptions,
558-
...(timeoutContext?.csotEnabled()
559-
? { timeoutMS: timeoutContext?.remainingTimeMS }
560-
: undefined)
591+
...commandOptions
561592
});
562593

563594
return serialize(response, this.bsonOptions);
@@ -575,17 +606,30 @@ export class StateMachine {
575606
client: MongoClient,
576607
keyVaultNamespace: string,
577608
filter: Uint8Array,
578-
timeoutContext?: TimeoutContext
609+
options?: { timeoutContext?: TimeoutContext } & Abortable
579610
): Promise<Array<DataKey>> {
580611
const { db: dbName, collection: collectionName } =
581612
MongoDBCollectionNamespace.fromString(keyVaultNamespace);
582613

614+
const commandOptions: {
615+
timeoutContext?: CursorTimeoutContext;
616+
signal?: AbortSignal;
617+
} = {
618+
timeoutContext: undefined,
619+
signal: undefined
620+
};
621+
622+
if (options?.timeoutContext != null) {
623+
commandOptions.timeoutContext = new CursorTimeoutContext(options.timeoutContext, Symbol());
624+
}
625+
if (options?.signal != null) {
626+
commandOptions.signal = options.signal;
627+
}
628+
583629
return client
584630
.db(dbName)
585631
.collection<DataKey>(collectionName, { readConcern: { level: 'majority' } })
586-
.find(deserialize(filter), {
587-
timeoutContext: timeoutContext && new CursorTimeoutContext(timeoutContext, Symbol())
588-
})
632+
.find(deserialize(filter), commandOptions)
589633
.toArray();
590634
}
591635
}

‎src/cmap/connection.ts

Lines changed: 15 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,7 @@ import {
3333
import type { ServerApi, SupportedNodeConnectionOptions } from '../mongo_client';
3434
import { type MongoClientAuthProviders } from '../mongo_client_auth_providers';
3535
import { MongoLoggableComponent, type MongoLogger, SeverityLevel } from '../mongo_logger';
36-
import { type CancellationToken, TypedEventEmitter } from '../mongo_types';
36+
import { type Abortable, type CancellationToken, TypedEventEmitter } from '../mongo_types';
3737
import { ReadPreference, type ReadPreferenceLike } from '../read_preference';
3838
import { ServerType } from '../sdam/common';
3939
import { applySession, type ClientSession, updateSessionFromResponse } from '../sessions';
@@ -438,7 +438,7 @@ export class Connection extends TypedEventEmitter<ConnectionEvents> {
438438

439439
private async *sendWire(
440440
message: WriteProtocolMessageType,
441-
options: CommandOptions,
441+
options: CommandOptions & Abortable,
442442
responseType?: MongoDBResponseConstructor
443443
): AsyncGenerator<MongoDBResponse> {
444444
this.throwIfAborted();
@@ -453,7 +453,8 @@ export class Connection extends TypedEventEmitter<ConnectionEvents> {
453453
await this.writeCommand(message, {
454454
agreedCompressor: this.description.compressor ?? 'none',
455455
zlibCompressionLevel: this.description.zlibCompressionLevel,
456-
timeoutContext: options.timeoutContext
456+
timeoutContext: options.timeoutContext,
457+
signal: options.signal
457458
});
458459

459460
if (options.noResponse || message.moreToCome) {
@@ -473,7 +474,10 @@ export class Connection extends TypedEventEmitter<ConnectionEvents> {
473474
);
474475
}
475476

476-
for await (const response of this.readMany({ timeoutContext: options.timeoutContext })) {
477+
for await (const response of this.readMany({
478+
timeoutContext: options.timeoutContext,
479+
signal: options.signal
480+
})) {
477481
this.socket.setTimeout(0);
478482
const bson = response.parse();
479483

@@ -676,7 +680,7 @@ export class Connection extends TypedEventEmitter<ConnectionEvents> {
676680
agreedCompressor?: CompressorName;
677681
zlibCompressionLevel?: number;
678682
timeoutContext?: TimeoutContext;
679-
}
683+
} & Abortable
680684
): Promise<void> {
681685
const finalCommand =
682686
options.agreedCompressor === 'none' || !OpCompressedRequest.canCompress(command)
@@ -701,7 +705,7 @@ export class Connection extends TypedEventEmitter<ConnectionEvents> {
701705

702706
if (this.socket.write(buffer)) return;
703707

704-
const drainEvent = once<void>(this.socket, 'drain');
708+
const drainEvent = once<void>(this.socket, 'drain', { signal: options.signal });
705709
const timeout = options?.timeoutContext?.timeoutForSocketWrite;
706710
if (timeout) {
707711
try {
@@ -729,9 +733,11 @@ export class Connection extends TypedEventEmitter<ConnectionEvents> {
729733
*
730734
* Note that `for-await` loops call `return` automatically when the loop is exited.
731735
*/
732-
private async *readMany(options: {
733-
timeoutContext?: TimeoutContext;
734-
}): AsyncGenerator<OpMsgResponse | OpReply> {
736+
private async *readMany(
737+
options: {
738+
timeoutContext?: TimeoutContext;
739+
} & Abortable
740+
): AsyncGenerator<OpMsgResponse | OpReply> {
735741
try {
736742
this.dataEvents = onData(this.messageStream, options);
737743
this.messageStream.resume();

‎src/cmap/connection_pool.ts

Lines changed: 17 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -25,10 +25,18 @@ import {
2525
MongoRuntimeError,
2626
MongoServerError
2727
} from '../error';
28-
import { CancellationToken, TypedEventEmitter } from '../mongo_types';
28+
import { type Abortable, CancellationToken, TypedEventEmitter } from '../mongo_types';
2929
import type { Server } from '../sdam/server';
3030
import { type TimeoutContext, TimeoutError } from '../timeout';
31-
import { type Callback, List, makeCounter, now, promiseWithResolvers } from '../utils';
31+
import {
32+
addAbortListener,
33+
type Callback,
34+
kDispose,
35+
List,
36+
makeCounter,
37+
now,
38+
promiseWithResolvers
39+
} from '../utils';
3240
import { connect } from './connect';
3341
import { Connection, type ConnectionEvents, type ConnectionOptions } from './connection';
3442
import {
@@ -316,7 +324,7 @@ export class ConnectionPool extends TypedEventEmitter<ConnectionPoolEvents> {
316324
* will be held by the pool. This means that if a connection is checked out it MUST be checked back in or
317325
* explicitly destroyed by the new owner.
318326
*/
319-
async checkOut(options: { timeoutContext: TimeoutContext }): Promise<Connection> {
327+
async checkOut(options: { timeoutContext: TimeoutContext } & Abortable): Promise<Connection> {
320328
const checkoutTime = now();
321329
this.emitAndLog(
322330
ConnectionPool.CONNECTION_CHECK_OUT_STARTED,
@@ -334,6 +342,11 @@ export class ConnectionPool extends TypedEventEmitter<ConnectionPoolEvents> {
334342
checkoutTime
335343
};
336344

345+
const abortListener = addAbortListener(options.signal, function () {
346+
waitQueueMember.cancelled = true;
347+
reject(this.reason);
348+
});
349+
337350
this.waitQueue.push(waitQueueMember);
338351
process.nextTick(() => this.processWaitQueue());
339352

@@ -364,6 +377,7 @@ export class ConnectionPool extends TypedEventEmitter<ConnectionPoolEvents> {
364377
}
365378
throw error;
366379
} finally {
380+
abortListener?.[kDispose]();
367381
timeout?.clear();
368382
}
369383
}

‎src/cmap/wire_protocol/on_data.ts

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,8 @@
11
import { type EventEmitter } from 'events';
22

3+
import { type Abortable } from '../../mongo_types';
34
import { type TimeoutContext } from '../../timeout';
4-
import { List, promiseWithResolvers } from '../../utils';
5+
import { addAbortListener, kDispose, List, promiseWithResolvers } from '../../utils';
56

67
/**
78
* @internal
@@ -21,8 +22,10 @@ type PendingPromises = Omit<
2122
*/
2223
export function onData(
2324
emitter: EventEmitter,
24-
{ timeoutContext }: { timeoutContext?: TimeoutContext }
25+
{ timeoutContext, signal }: { timeoutContext?: TimeoutContext } & Abortable
2526
) {
27+
signal?.throwIfAborted();
28+
2629
// Setup pending events and pending promise lists
2730
/**
2831
* When the caller has not yet called .next(), we store the
@@ -90,6 +93,9 @@ export function onData(
9093
// Adding event handlers
9194
emitter.on('data', eventHandler);
9295
emitter.on('error', errorHandler);
96+
const abortListener = addAbortListener(signal, function () {
97+
errorHandler(this.reason);
98+
});
9399

94100
const timeoutForSocketRead = timeoutContext?.timeoutForSocketRead;
95101
timeoutForSocketRead?.throwIfExpired();
@@ -115,6 +121,7 @@ export function onData(
115121
// Adding event handlers
116122
emitter.off('data', eventHandler);
117123
emitter.off('error', errorHandler);
124+
abortListener?.[kDispose]();
118125
finished = true;
119126
timeoutForSocketRead?.clear();
120127
const doneResult = { value: undefined, done: finished } as const;

‎src/collection.ts

Lines changed: 15 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@ import type { Db } from './db';
1414
import { MongoInvalidArgumentError, MongoOperationTimeoutError } from './error';
1515
import type { MongoClient, PkFactory } from './mongo_client';
1616
import type {
17+
Abortable,
1718
Filter,
1819
Flatten,
1920
OptionalUnlessRequiredId,
@@ -505,20 +506,20 @@ export class Collection<TSchema extends Document = Document> {
505506
async findOne(filter: Filter<TSchema>): Promise<WithId<TSchema> | null>;
506507
async findOne(
507508
filter: Filter<TSchema>,
508-
options: Omit<FindOptions, 'timeoutMode'>
509+
options: Omit<FindOptions, 'timeoutMode'> & Abortable
509510
): Promise<WithId<TSchema> | null>;
510511

511512
// allow an override of the schema.
512513
async findOne<T = TSchema>(): Promise<T | null>;
513514
async findOne<T = TSchema>(filter: Filter<TSchema>): Promise<T | null>;
514515
async findOne<T = TSchema>(
515516
filter: Filter<TSchema>,
516-
options?: Omit<FindOptions, 'timeoutMode'>
517+
options?: Omit<FindOptions, 'timeoutMode'> & Abortable
517518
): Promise<T | null>;
518519

519520
async findOne(
520521
filter: Filter<TSchema> = {},
521-
options: FindOptions = {}
522+
options: FindOptions & Abortable = {}
522523
): Promise<WithId<TSchema> | null> {
523524
const cursor = this.find(filter, options).limit(-1).batchSize(1);
524525
const res = await cursor.next();
@@ -532,9 +533,15 @@ export class Collection<TSchema extends Document = Document> {
532533
* @param filter - The filter predicate. If unspecified, then all documents in the collection will match the predicate
533534
*/
534535
find(): FindCursor<WithId<TSchema>>;
535-
find(filter: Filter<TSchema>, options?: FindOptions): FindCursor<WithId<TSchema>>;
536-
find<T extends Document>(filter: Filter<TSchema>, options?: FindOptions): FindCursor<T>;
537-
find(filter: Filter<TSchema> = {}, options: FindOptions = {}): FindCursor<WithId<TSchema>> {
536+
find(filter: Filter<TSchema>, options?: FindOptions & Abortable): FindCursor<WithId<TSchema>>;
537+
find<T extends Document>(
538+
filter: Filter<TSchema>,
539+
options?: FindOptions & Abortable
540+
): FindCursor<T>;
541+
find(
542+
filter: Filter<TSchema> = {},
543+
options: FindOptions & Abortable = {}
544+
): FindCursor<WithId<TSchema>> {
538545
return new FindCursor<WithId<TSchema>>(
539546
this.client,
540547
this.s.namespace,
@@ -792,7 +799,7 @@ export class Collection<TSchema extends Document = Document> {
792799
*/
793800
async countDocuments(
794801
filter: Filter<TSchema> = {},
795-
options: CountDocumentsOptions = {}
802+
options: CountDocumentsOptions & Abortable = {}
796803
): Promise<number> {
797804
const pipeline = [];
798805
pipeline.push({ $match: filter });
@@ -1006,7 +1013,7 @@ export class Collection<TSchema extends Document = Document> {
10061013
*/
10071014
aggregate<T extends Document = Document>(
10081015
pipeline: Document[] = [],
1009-
options?: AggregateOptions
1016+
options?: AggregateOptions & Abortable
10101017
): AggregationCursor<T> {
10111018
if (!Array.isArray(pipeline)) {
10121019
throw new MongoInvalidArgumentError(

‎src/cursor/abstract_cursor.ts

Lines changed: 38 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@ import {
1212
MongoTailableCursorError
1313
} from '../error';
1414
import type { MongoClient } from '../mongo_client';
15-
import { TypedEventEmitter } from '../mongo_types';
15+
import { type Abortable, TypedEventEmitter } from '../mongo_types';
1616
import { executeOperation } from '../operations/execute_operation';
1717
import { GetMoreOperation } from '../operations/get_more';
1818
import { KillCursorsOperation } from '../operations/kill_cursors';
@@ -22,7 +22,13 @@ import { type AsyncDisposable, configureResourceManagement } from '../resource_m
2222
import type { Server } from '../sdam/server';
2323
import { ClientSession, maybeClearPinnedConnection } from '../sessions';
2424
import { type CSOTTimeoutContext, type Timeout, TimeoutContext } from '../timeout';
25-
import { type MongoDBNamespace, squashError } from '../utils';
25+
import {
26+
addAbortListener,
27+
type Disposable,
28+
kDispose,
29+
type MongoDBNamespace,
30+
squashError
31+
} from '../utils';
2632

2733
/**
2834
* @internal
@@ -247,12 +253,14 @@ export abstract class AbstractCursor<
247253

248254
/** @internal */
249255
protected deserializationOptions: OnDemandDocumentDeserializeOptions;
256+
protected signal: AbortSignal | undefined;
257+
private abortListener: Disposable | undefined;
250258

251259
/** @internal */
252260
protected constructor(
253261
client: MongoClient,
254262
namespace: MongoDBNamespace,
255-
options: AbstractCursorOptions = {}
263+
options: AbstractCursorOptions & Abortable = {}
256264
) {
257265
super();
258266

@@ -352,6 +360,9 @@ export abstract class AbstractCursor<
352360
};
353361

354362
this.timeoutContext = options.timeoutContext;
363+
this.signal = options.signal;
364+
// eslint-disable-next-line @typescript-eslint/no-misused-promises
365+
this.abortListener = addAbortListener(this.signal, this.close.bind(this, undefined));
355366
}
356367

357368
/**
@@ -455,6 +466,8 @@ export abstract class AbstractCursor<
455466
}
456467

457468
async *[Symbol.asyncIterator](): AsyncGenerator<TSchema, void, void> {
469+
this.signal?.throwIfAborted();
470+
458471
if (this.closed) {
459472
return;
460473
}
@@ -481,6 +494,8 @@ export abstract class AbstractCursor<
481494
}
482495

483496
yield document;
497+
498+
this.signal?.throwIfAborted();
484499
}
485500
} finally {
486501
// Only close the cursor if it has not already been closed. This finally clause handles
@@ -496,9 +511,16 @@ export abstract class AbstractCursor<
496511
}
497512

498513
stream(options?: CursorStreamOptions): Readable & AsyncIterable<TSchema> {
514+
const readable = new ReadableCursorStream(this);
515+
const abortListener = addAbortListener(this.signal, function () {
516+
readable.destroy(this.reason);
517+
});
518+
readable.once('end', () => {
519+
abortListener?.[kDispose]();
520+
});
521+
499522
if (options?.transform) {
500523
const transform = options.transform;
501-
const readable = new ReadableCursorStream(this);
502524

503525
const transformedStream = readable.pipe(
504526
new Transform({
@@ -522,10 +544,12 @@ export abstract class AbstractCursor<
522544
return transformedStream;
523545
}
524546

525-
return new ReadableCursorStream(this);
547+
return readable;
526548
}
527549

528550
async hasNext(): Promise<boolean> {
551+
this.signal?.throwIfAborted();
552+
529553
if (this.cursorId === Long.ZERO) {
530554
return false;
531555
}
@@ -551,6 +575,8 @@ export abstract class AbstractCursor<
551575

552576
/** Get the next available document from the cursor, returns null if no more documents are available. */
553577
async next(): Promise<TSchema | null> {
578+
this.signal?.throwIfAborted();
579+
554580
if (this.cursorId === Long.ZERO) {
555581
throw new MongoCursorExhaustedError();
556582
}
@@ -581,6 +607,8 @@ export abstract class AbstractCursor<
581607
* Try to get the next available document from the cursor or `null` if an empty batch is returned
582608
*/
583609
async tryNext(): Promise<TSchema | null> {
610+
this.signal?.throwIfAborted();
611+
584612
if (this.cursorId === Long.ZERO) {
585613
throw new MongoCursorExhaustedError();
586614
}
@@ -620,6 +648,8 @@ export abstract class AbstractCursor<
620648
* @deprecated - Will be removed in a future release. Use for await...of instead.
621649
*/
622650
async forEach(iterator: (doc: TSchema) => boolean | void): Promise<void> {
651+
this.signal?.throwIfAborted();
652+
623653
if (typeof iterator !== 'function') {
624654
throw new MongoInvalidArgumentError('Argument "iterator" must be a function');
625655
}
@@ -645,6 +675,8 @@ export abstract class AbstractCursor<
645675
* cursor.rewind() can be used to reset the cursor.
646676
*/
647677
async toArray(): Promise<TSchema[]> {
678+
this.signal?.throwIfAborted();
679+
648680
const array: TSchema[] = [];
649681
// at the end of the loop (since readBufferedDocuments is called) the buffer will be empty
650682
// then, the 'await of' syntax will run a getMore call
@@ -968,6 +1000,7 @@ export abstract class AbstractCursor<
9681000

9691001
/** @internal */
9701002
private async cleanup(timeoutMS?: number, error?: Error) {
1003+
this.abortListener?.[kDispose]();
9711004
this.isClosed = true;
9721005
const session = this.cursorSession;
9731006
const timeoutContextForKillCursors = (): CursorTimeoutContext | undefined => {

‎src/cursor/aggregation_cursor.ts

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ import {
88
validateExplainTimeoutOptions
99
} from '../explain';
1010
import type { MongoClient } from '../mongo_client';
11+
import { type Abortable } from '../mongo_types';
1112
import { AggregateOperation, type AggregateOptions } from '../operations/aggregate';
1213
import { executeOperation } from '../operations/execute_operation';
1314
import type { ClientSession } from '../sessions';
@@ -39,7 +40,7 @@ export class AggregationCursor<TSchema = any> extends ExplainableCursor<TSchema>
3940
client: MongoClient,
4041
namespace: MongoDBNamespace,
4142
pipeline: Document[] = [],
42-
options: AggregateOptions = {}
43+
options: AggregateOptions & Abortable = {}
4344
) {
4445
super(client, namespace, options);
4546

‎src/cursor/find_cursor.ts

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -72,7 +72,8 @@ export class FindCursor<TSchema = any> extends ExplainableCursor<TSchema> {
7272
const options = {
7373
...this.findOptions, // NOTE: order matters here, we may need to refine this
7474
...this.cursorOptions,
75-
session
75+
session,
76+
signal: this.signal
7677
};
7778

7879
if (options.explain) {

‎src/db.ts

Lines changed: 11 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@ import { ListCollectionsCursor } from './cursor/list_collections_cursor';
88
import { RunCommandCursor, type RunCursorCommandOptions } from './cursor/run_command_cursor';
99
import { MongoInvalidArgumentError } from './error';
1010
import type { MongoClient, PkFactory } from './mongo_client';
11-
import type { TODO_NODE_3286 } from './mongo_types';
11+
import type { Abortable, TODO_NODE_3286 } from './mongo_types';
1212
import type { AggregateOptions } from './operations/aggregate';
1313
import { CollectionsOperation } from './operations/collections';
1414
import {
@@ -273,7 +273,7 @@ export class Db {
273273
* @param command - The command to run
274274
* @param options - Optional settings for the command
275275
*/
276-
async command(command: Document, options?: RunCommandOptions): Promise<Document> {
276+
async command(command: Document, options?: RunCommandOptions & Abortable): Promise<Document> {
277277
// Intentionally, we do not inherit options from parent for this operation.
278278
return await executeOperation(
279279
this.client,
@@ -284,7 +284,8 @@ export class Db {
284284
...resolveBSONOptions(options),
285285
timeoutMS: options?.timeoutMS ?? this.timeoutMS,
286286
session: options?.session,
287-
readPreference: options?.readPreference
287+
readPreference: options?.readPreference,
288+
signal: options?.signal
288289
})
289290
)
290291
);
@@ -351,22 +352,25 @@ export class Db {
351352
*/
352353
listCollections(
353354
filter: Document,
354-
options: Exclude<ListCollectionsOptions, 'nameOnly'> & { nameOnly: true }
355+
options: Exclude<ListCollectionsOptions, 'nameOnly'> & { nameOnly: true } & Abortable
355356
): ListCollectionsCursor<Pick<CollectionInfo, 'name' | 'type'>>;
356357
listCollections(
357358
filter: Document,
358-
options: Exclude<ListCollectionsOptions, 'nameOnly'> & { nameOnly: false }
359+
options: Exclude<ListCollectionsOptions, 'nameOnly'> & { nameOnly: false } & Abortable
359360
): ListCollectionsCursor<CollectionInfo>;
360361
listCollections<
361362
T extends Pick<CollectionInfo, 'name' | 'type'> | CollectionInfo =
362363
| Pick<CollectionInfo, 'name' | 'type'>
363364
| CollectionInfo
364-
>(filter?: Document, options?: ListCollectionsOptions): ListCollectionsCursor<T>;
365+
>(filter?: Document, options?: ListCollectionsOptions & Abortable): ListCollectionsCursor<T>;
365366
listCollections<
366367
T extends Pick<CollectionInfo, 'name' | 'type'> | CollectionInfo =
367368
| Pick<CollectionInfo, 'name' | 'type'>
368369
| CollectionInfo
369-
>(filter: Document = {}, options: ListCollectionsOptions = {}): ListCollectionsCursor<T> {
370+
>(
371+
filter: Document = {},
372+
options: ListCollectionsOptions & Abortable = {}
373+
): ListCollectionsCursor<T> {
370374
return new ListCollectionsCursor<T>(this, filter, resolveOptions(this, options));
371375
}
372376

‎src/index.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -430,6 +430,7 @@ export type {
430430
SeverityLevel
431431
} from './mongo_logger';
432432
export type {
433+
Abortable,
433434
CommonEvents,
434435
EventsDescription,
435436
GenericListener,

‎src/mongo_types.ts

Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -474,6 +474,38 @@ export class TypedEventEmitter<Events extends EventsDescription> extends EventEm
474474
/** @public */
475475
export class CancellationToken extends TypedEventEmitter<{ cancel(): void }> {}
476476

477+
/** @public */
478+
export type Abortable = {
479+
/**
480+
* When provided the corresponding `AbortController` can be used to cancel an asynchronous action.
481+
*
482+
* The driver will convert the abort event into a promise rejection with an error that has the name `'AbortError'`.
483+
*
484+
* The cause of the error will be set to `signal.reason`
485+
*
486+
* @example
487+
* ```js
488+
* const controller = new AbortController();
489+
* const { signal } = controller;
490+
* req,on('close', () => controller.abort(new Error('Request aborted by user')));
491+
*
492+
* try {
493+
* const res = await fetch('...', { signal });
494+
* await collection.insertOne(await res.json(), { signal });
495+
* catch (error) {
496+
* if (error.name === 'AbortError') {
497+
* // error is MongoAbortError or DOMException,
498+
* // both represent the signal being aborted
499+
* error.cause === signal.reason; // true
500+
* }
501+
* }
502+
* ```
503+
*
504+
* @see MongoAbortError
505+
*/
506+
signal?: AbortSignal | undefined;
507+
};
508+
477509
/**
478510
* Helper types for dot-notation filter attributes
479511
*/

‎src/operations/execute_operation.ts

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -64,7 +64,10 @@ export async function executeOperation<
6464
throw new MongoRuntimeError('This method requires a valid operation instance');
6565
}
6666

67+
// Like CSOT, an operation signal interruption does not relate to auto-connect
68+
operation.options.signal?.throwIfAborted();
6769
const topology = await autoConnect(client);
70+
operation.options.signal?.throwIfAborted();
6871

6972
// The driver sessions spec mandates that we implicitly create sessions for operations
7073
// that are not explicitly provided with a session.
@@ -198,7 +201,8 @@ async function tryOperation<
198201
let server = await topology.selectServer(selector, {
199202
session,
200203
operationName: operation.commandName,
201-
timeoutContext
204+
timeoutContext,
205+
signal: operation.options.signal
202206
});
203207

204208
const hasReadAspect = operation.hasAspect(Aspect.READ_OPERATION);
@@ -260,7 +264,8 @@ async function tryOperation<
260264
server = await topology.selectServer(selector, {
261265
session,
262266
operationName: operation.commandName,
263-
previousServer
267+
previousServer,
268+
signal: operation.options.signal
264269
});
265270

266271
if (hasWriteAspect && !supportsRetryableWrites(server)) {

‎src/operations/list_collections.ts

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ import type { Binary, Document } from '../bson';
22
import { CursorResponse } from '../cmap/wire_protocol/responses';
33
import { type CursorTimeoutContext, type CursorTimeoutMode } from '../cursor/abstract_cursor';
44
import type { Db } from '../db';
5+
import { type Abortable } from '../mongo_types';
56
import type { Server } from '../sdam/server';
67
import type { ClientSession } from '../sessions';
78
import { type TimeoutContext } from '../timeout';
@@ -10,7 +11,9 @@ import { CommandOperation, type CommandOperationOptions } from './command';
1011
import { Aspect, defineAspects } from './operation';
1112

1213
/** @public */
13-
export interface ListCollectionsOptions extends Omit<CommandOperationOptions, 'writeConcern'> {
14+
export interface ListCollectionsOptions
15+
extends Omit<CommandOperationOptions, 'writeConcern'>,
16+
Abortable {
1417
/** Since 4.0: If true, will only return the collection name in the response, and will omit additional info */
1518
nameOnly?: boolean;
1619
/** Since 4.0: If true and nameOnly is true, allows a user without the required privilege (i.e. listCollections action on the database) to run the command when access control is enforced. */

‎src/operations/operation.ts

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
import { type BSONSerializeOptions, type Document, resolveBSONOptions } from '../bson';
2+
import { type Abortable } from '../mongo_types';
23
import { ReadPreference, type ReadPreferenceLike } from '../read_preference';
34
import type { Server } from '../sdam/server';
45
import type { ClientSession } from '../sessions';
@@ -59,7 +60,7 @@ export abstract class AbstractOperation<TResult = any> {
5960
// BSON serialization options
6061
bsonOptions?: BSONSerializeOptions;
6162

62-
options: OperationOptions;
63+
options: OperationOptions & Abortable;
6364

6465
/** Specifies the time an operation will run until it throws a timeout error. */
6566
timeoutMS?: number;
@@ -68,7 +69,7 @@ export abstract class AbstractOperation<TResult = any> {
6869

6970
static aspects?: Set<symbol>;
7071

71-
constructor(options: OperationOptions = {}) {
72+
constructor(options: OperationOptions & Abortable = {}) {
7273
this.readPreference = this.hasAspect(Aspect.WRITE_OPERATION)
7374
? ReadPreference.primary
7475
: (ReadPreference.fromOptions(options) ?? ReadPreference.primary);

‎src/sdam/server.ts

Lines changed: 14 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,7 @@ import {
3636
needsRetryableWriteLabel
3737
} from '../error';
3838
import type { ServerApi } from '../mongo_client';
39-
import { TypedEventEmitter } from '../mongo_types';
39+
import { type Abortable, TypedEventEmitter } from '../mongo_types';
4040
import type { GetMoreOptions } from '../operations/get_more';
4141
import type { ClientSession } from '../sessions';
4242
import { type TimeoutContext } from '../timeout';
@@ -107,7 +107,7 @@ export type ServerEvents = {
107107
/** @internal */
108108
export type ServerCommandOptions = Omit<CommandOptions, 'timeoutContext' | 'socketTimeoutMS'> & {
109109
timeoutContext: TimeoutContext;
110-
};
110+
} & Abortable;
111111

112112
/** @internal */
113113
export class Server extends TypedEventEmitter<ServerEvents> {
@@ -285,7 +285,7 @@ export class Server extends TypedEventEmitter<ServerEvents> {
285285
public async command(
286286
ns: MongoDBNamespace,
287287
cmd: Document,
288-
options: ServerCommandOptions,
288+
paramOpts: ServerCommandOptions,
289289
responseType?: MongoDBResponseConstructor
290290
): Promise<Document> {
291291
if (ns.db == null || typeof ns === 'string') {
@@ -297,24 +297,25 @@ export class Server extends TypedEventEmitter<ServerEvents> {
297297
}
298298

299299
// Clone the options
300-
const finalOptions = Object.assign({}, options, {
300+
const options = {
301+
...paramOpts,
301302
wireProtocolCommand: false,
302303
directConnection: this.topology.s.options.directConnection
303-
});
304+
};
304305

305306
// There are cases where we need to flag the read preference not to get sent in
306307
// the command, such as pre-5.0 servers attempting to perform an aggregate write
307308
// with a non-primary read preference. In this case the effective read preference
308309
// (primary) is not the same as the provided and must be removed completely.
309-
if (finalOptions.omitReadPreference) {
310-
delete finalOptions.readPreference;
310+
if (options.omitReadPreference) {
311+
delete options.readPreference;
311312
}
312313

313314
if (this.description.iscryptd) {
314-
finalOptions.omitMaxTimeMS = true;
315+
options.omitMaxTimeMS = true;
315316
}
316317

317-
const session = finalOptions.session;
318+
const session = options.session;
318319
let conn = session?.pinnedConnection;
319320

320321
this.incrementOperationCount();
@@ -333,11 +334,11 @@ export class Server extends TypedEventEmitter<ServerEvents> {
333334

334335
try {
335336
try {
336-
const res = await conn.command(ns, cmd, finalOptions, responseType);
337+
const res = await conn.command(ns, cmd, options, responseType);
337338
throwIfWriteConcernError(res);
338339
return res;
339340
} catch (commandError) {
340-
throw this.decorateCommandError(conn, cmd, finalOptions, commandError);
341+
throw this.decorateCommandError(conn, cmd, options, commandError);
341342
}
342343
} catch (operationError) {
343344
if (
@@ -346,11 +347,11 @@ export class Server extends TypedEventEmitter<ServerEvents> {
346347
) {
347348
await this.pool.reauthenticate(conn);
348349
try {
349-
const res = await conn.command(ns, cmd, finalOptions, responseType);
350+
const res = await conn.command(ns, cmd, options, responseType);
350351
throwIfWriteConcernError(res);
351352
return res;
352353
} catch (commandError) {
353-
throw this.decorateCommandError(conn, cmd, finalOptions, commandError);
354+
throw this.decorateCommandError(conn, cmd, options, commandError);
354355
}
355356
} else {
356357
throw operationError;

‎src/sdam/topology.ts

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -31,15 +31,17 @@ import {
3131
} from '../error';
3232
import type { MongoClient, ServerApi } from '../mongo_client';
3333
import { MongoLoggableComponent, type MongoLogger, SeverityLevel } from '../mongo_logger';
34-
import { TypedEventEmitter } from '../mongo_types';
34+
import { type Abortable, TypedEventEmitter } from '../mongo_types';
3535
import { ReadPreference, type ReadPreferenceLike } from '../read_preference';
3636
import type { ClientSession } from '../sessions';
3737
import { Timeout, TimeoutContext, TimeoutError } from '../timeout';
3838
import type { Transaction } from '../transactions';
3939
import {
40+
addAbortListener,
4041
type Callback,
4142
type EventEmitterWithState,
4243
HostAddress,
44+
kDispose,
4345
List,
4446
makeStateMachine,
4547
now,
@@ -525,7 +527,7 @@ export class Topology extends TypedEventEmitter<TopologyEvents> {
525527
*/
526528
async selectServer(
527529
selector: string | ReadPreference | ServerSelector,
528-
options: SelectServerOptions
530+
options: SelectServerOptions & Abortable
529531
): Promise<Server> {
530532
let serverSelector;
531533
if (typeof selector !== 'function') {
@@ -602,6 +604,11 @@ export class Topology extends TypedEventEmitter<TopologyEvents> {
602604
previousServer: options.previousServer
603605
};
604606

607+
const abortListener = addAbortListener(options.signal, function () {
608+
waitQueueMember.cancelled = true;
609+
reject(this.reason);
610+
});
611+
605612
this.waitQueue.push(waitQueueMember);
606613
processWaitQueue(this);
607614

@@ -647,6 +654,7 @@ export class Topology extends TypedEventEmitter<TopologyEvents> {
647654
// Other server selection error
648655
throw error;
649656
} finally {
657+
abortListener?.[kDispose]();
650658
if (options.timeoutContext?.clearServerSelectionTimeout) timeout?.clear();
651659
}
652660
}

‎src/utils.ts

Lines changed: 26 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@ import {
2727
MongoRuntimeError
2828
} from './error';
2929
import type { MongoClient } from './mongo_client';
30+
import { type Abortable } from './mongo_types';
3031
import type { CommandOperationOptions, OperationParent } from './operations/command';
3132
import type { Hint, OperationOptions } from './operations/operation';
3233
import { ReadConcern } from './read_concern';
@@ -1349,19 +1350,24 @@ export const randomBytes = promisify(crypto.randomBytes);
13491350
* @param ee - An event emitter that may emit `ev`
13501351
* @param name - An event name to wait for
13511352
*/
1352-
export async function once<T>(ee: EventEmitter, name: string): Promise<T> {
1353+
export async function once<T>(ee: EventEmitter, name: string, options?: Abortable): Promise<T> {
1354+
options?.signal?.throwIfAborted();
1355+
13531356
const { promise, resolve, reject } = promiseWithResolvers<T>();
13541357
const onEvent = (data: T) => resolve(data);
13551358
const onError = (error: Error) => reject(error);
1359+
const abortListener = addAbortListener(options?.signal, function () {
1360+
reject(this.reason);
1361+
});
13561362

13571363
ee.once(name, onEvent).once('error', onError);
1364+
13581365
try {
1359-
const res = await promise;
1360-
ee.off('error', onError);
1361-
return res;
1362-
} catch (error) {
1366+
return await promise;
1367+
} finally {
13631368
ee.off(name, onEvent);
1364-
throw error;
1369+
ee.off('error', onError);
1370+
abortListener?.[kDispose]();
13651371
}
13661372
}
13671373

@@ -1468,3 +1474,17 @@ export function decorateDecryptionResult(
14681474
decorateDecryptionResult(decrypted[k], originalValue, false);
14691475
}
14701476
}
1477+
1478+
export const kDispose: unique symbol = (Symbol.dispose as any) ?? Symbol('dispose');
1479+
export interface Disposable {
1480+
[kDispose](): void;
1481+
}
1482+
1483+
export function addAbortListener(
1484+
signal: AbortSignal | undefined | null,
1485+
listener: (this: AbortSignal, event: Event) => void
1486+
): Disposable | undefined {
1487+
if (signal == null) return;
1488+
signal.addEventListener('abort', listener);
1489+
return { [kDispose]: () => signal.removeEventListener('abort', listener) };
1490+
}

‎test/integration/client-side-encryption/driver.test.ts

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -831,12 +831,12 @@ describe('CSOT', function () {
831831
describe('State machine', function () {
832832
const stateMachine = new StateMachine({} as any);
833833

834-
const timeoutContext = () => {
835-
return new CSOTTimeoutContext({
834+
const timeoutContext = () => ({
835+
timeoutContext: new CSOTTimeoutContext({
836836
timeoutMS: 1000,
837837
serverSelectionTimeoutMS: 30000
838-
});
839-
};
838+
})
839+
});
840840

841841
const timeoutMS = 1000;
842842

@@ -1001,7 +1001,7 @@ describe('CSOT', function () {
10011001

10021002
const { result: error } = await measureDuration(() =>
10031003
stateMachine
1004-
.fetchKeys(client, 'test.test', BSON.serialize({}), timeoutContext)
1004+
.fetchKeys(client, 'test.test', BSON.serialize({}), { timeoutContext })
10051005
.catch(e => e)
10061006
);
10071007
expect(error).to.be.instanceOf(MongoOperationTimeoutError);

‎test/integration/client-side-operations-timeout/client_side_operations_timeout.unit.test.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -138,7 +138,7 @@ describe('CSOT spec unit tests', function () {
138138
timeoutMS: 500,
139139
serverSelectionTimeoutMS: 30000
140140
});
141-
const err = await stateMachine.kmsRequest(request, timeoutContext).catch(e => e);
141+
const err = await stateMachine.kmsRequest(request, { timeoutContext }).catch(e => e);
142142
expect(err).to.be.instanceOf(MongoOperationTimeoutError);
143143
expect(err.errmsg).to.equal('KMS request timed out');
144144
});

‎test/integration/node-specific/abort_signal.test.ts

Lines changed: 872 additions & 0 deletions
Large diffs are not rendered by default.

‎test/tools/utils.ts

Lines changed: 42 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -622,7 +622,7 @@ export async function clearFailPoint(configuration: TestConfiguration, url = con
622622

623623
export async function makeMultiBatchWrite(
624624
configuration: TestConfiguration
625-
): Promise<AnyClientBulkWriteModel[]> {
625+
): Promise<AnyClientBulkWriteModel<any>[]> {
626626
const { maxBsonObjectSize, maxMessageSizeBytes } = await configuration.hello();
627627

628628
const length = maxMessageSizeBytes / maxBsonObjectSize + 1;
@@ -637,10 +637,10 @@ export async function makeMultiBatchWrite(
637637

638638
export async function makeMultiResponseBatchModelArray(
639639
configuration: TestConfiguration
640-
): Promise<AnyClientBulkWriteModel[]> {
640+
): Promise<AnyClientBulkWriteModel<any>[]> {
641641
const { maxBsonObjectSize } = await configuration.hello();
642642
const namespace = `foo.${new BSON.ObjectId().toHexString()}`;
643-
const models: AnyClientBulkWriteModel[] = [
643+
const models: AnyClientBulkWriteModel<any>[] = [
644644
{
645645
name: 'updateOne',
646646
namespace,
@@ -693,3 +693,42 @@ export function mergeTestMetadata(
693693
}
694694
};
695695
}
696+
697+
export function findLast<T, S extends T>(
698+
array: T[],
699+
predicate: (value: T, index: number, array: T[]) => value is S,
700+
thisArg?: any
701+
): S | undefined;
702+
export function findLast<T>(
703+
array: T[],
704+
predicate: (value: T, index: number, array: T[]) => boolean,
705+
thisArg?: any
706+
): T | undefined;
707+
export function findLast(
708+
array: unknown[],
709+
predicate: (value: unknown, index: number, array: unknown[]) => boolean,
710+
thisArg?: any
711+
): unknown | undefined {
712+
if (typeof array.findLast === 'function') return array.findLast(predicate, thisArg);
713+
714+
for (let i = array.length - 1; i >= 0; i--) {
715+
if (predicate.call(thisArg, array[i], i, array)) {
716+
return array[i];
717+
}
718+
}
719+
720+
return undefined;
721+
}
722+
723+
// Node.js 16 doesn't make this global, but it can still be obtained.
724+
export const DOMException: {
725+
new: (
726+
message?: string,
727+
nameOrOptions?: string | { name?: string; cause?: unknown }
728+
) => DOMException;
729+
} = (() => {
730+
if (globalThis.DOMException != null) return globalThis.DOMException;
731+
const ac = new AbortController();
732+
ac.abort();
733+
return ac.signal.reason.constructor;
734+
})();

‎test/unit/client-side-encryption/state_machine.test.ts

Lines changed: 11 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -81,7 +81,12 @@ describe('StateMachine', function () {
8181
a: new Long('0'),
8282
b: new Int32(0)
8383
};
84-
const options = { promoteLongs: false, promoteValues: false };
84+
const options = {
85+
promoteLongs: false,
86+
promoteValues: false,
87+
signal: undefined,
88+
timeoutMS: undefined
89+
};
8590
const serializedCommand = serialize(command);
8691
const stateMachine = new StateMachine({} as any);
8792

@@ -493,7 +498,7 @@ describe('StateMachine', function () {
493498
});
494499

495500
await stateMachine
496-
.fetchKeys(client, 'keyVault', BSON.serialize({ a: 1 }), context)
501+
.fetchKeys(client, 'keyVault', BSON.serialize({ a: 1 }), { timeoutContext: context })
497502
.catch(e => squashError(e));
498503

499504
const { timeoutContext } = findSpy.getCalls()[0].args[1] as FindOptions;
@@ -535,7 +540,7 @@ describe('StateMachine', function () {
535540
});
536541
await sleep(300);
537542
await stateMachine
538-
.markCommand(client, 'keyVault', BSON.serialize({ a: 1 }), timeoutContext)
543+
.markCommand(client, 'keyVault', BSON.serialize({ a: 1 }), { timeoutContext })
539544
.catch(e => squashError(e));
540545
expect(dbCommandSpy.getCalls()[0].args[1].timeoutMS).to.not.be.undefined;
541546
expect(dbCommandSpy.getCalls()[0].args[1].timeoutMS).to.be.lessThanOrEqual(205);
@@ -576,7 +581,9 @@ describe('StateMachine', function () {
576581
});
577582
await sleep(300);
578583
await stateMachine
579-
.fetchCollectionInfo(client, 'keyVault', BSON.serialize({ a: 1 }), context)
584+
.fetchCollectionInfo(client, 'keyVault', BSON.serialize({ a: 1 }), {
585+
timeoutContext: context
586+
})
580587
.catch(e => squashError(e));
581588
const [_filter, { timeoutContext }] = listCollectionsSpy.getCalls()[0].args;
582589
expect(timeoutContext).to.exist;

0 commit comments

Comments
 (0)
Please sign in to comment.