Skip to content
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
34 changes: 34 additions & 0 deletions examples/h2-other-side-closed-exit-0-fetch.cjs
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
const { fetch, setGlobalDispatcher, Agent } = require('..');

setGlobalDispatcher(new Agent({
allowH2: true,
}));

async function main() {
for (let i = 0; i < 100; i++) {
try {
const r = await fetch('https://edgeupdates.microsoft.com/api/products');
console.log(r.status, r.headers, (await r.text()).length);
} catch (err) {
// console.error(err);
// throw err;
if (err.code === 'UND_ERR_SOCKET') {
continue;
} else {
throw err;
}
Comment thread
fengmk2 marked this conversation as resolved.
}
}
}

main().then(() => {
console.log('main end');
}).catch(err => {
console.error('main error throw: %s', err);
// console.error(err);
process.exit(1);
});

process.on('beforeExit', (...args) => {
console.error('beforeExit', args);
});
34 changes: 34 additions & 0 deletions examples/h2-other-side-closed-exit-0.cjs
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
const { request, Agent, setGlobalDispatcher } = require('undici');

setGlobalDispatcher(new Agent({
allowH2: true,
}));
Comment thread
fengmk2 marked this conversation as resolved.

async function main() {
for (let i = 0; i < 100; i++) {
try {
const r = await request('https://edgeupdates.microsoft.com/api/products');
console.log(r.statusCode, r.headers, (await r.body.blob()).size);
} catch (err) {
// console.error(err);
// throw err;
if (err.code === 'UND_ERR_SOCKET') {
continue;
} else {
throw err;
}
}
}
}
Comment thread
fengmk2 marked this conversation as resolved.

main().then(() => {
console.log('main end');
}).catch(err => {
console.error('main error throw: %s', err);
// console.error(err);
process.exit(1);
});

process.on('beforeExit', (...args) => {
console.error('beforeExit', args);
});
Comment thread
fengmk2 marked this conversation as resolved.
49 changes: 49 additions & 0 deletions examples/longruning.cjs
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
const { HttpClient } = require('..');

const httpClient = new HttpClient({
allowH2: true,
});

async function main() {
for (let i = 0; i < 1000000; i++) {
// await httpClient.request('https://registry.npmmirror.com/');
// console.log(r.status, r.headers, r.res.timing);
try {
const r = await httpClient.request('https://edgeupdates.microsoft.com/api/products');
// console.log(r.status, r.headers, r.data.length, r.res.timing);
if (i % 10 === 0) {
Comment thread
fengmk2 marked this conversation as resolved.
// console.log(r.status, r.headers, r.data.length, r.res.timing);
console.log(i, r.status, process.memoryUsage());
}
} catch (err) {
console.error('%s error: %s', i, err.message);
}
}
Comment thread
fengmk2 marked this conversation as resolved.
}

main().then(() => {
console.log('main end');
}).catch(err => {
console.error('main error throw: %s', err);
console.error(err);
process.exit(1);
});

// process.on('uncaughtException', (...args) => {
// console.error('uncaughtException', args);
// process.exit(1);
// });

// process.on('unhandledRejection', (...args) => {
// console.error('unhandledRejection', args);
// process.exit(2);
// });

// process.on('uncaughtExceptionMonitor', (...args) => {
// console.error('uncaughtExceptionMonitor', args);
// process.exit(2);
// });

process.on('beforeExit', (...args) => {
console.error('beforeExit', args);
});
Comment thread
fengmk2 marked this conversation as resolved.
4 changes: 3 additions & 1 deletion package.json
Original file line number Diff line number Diff line change
Expand Up @@ -44,11 +44,12 @@
"prepublishOnly": "npm run build"
},
"dependencies": {
"form-data": "^4.0.1",
"formstream": "^1.5.1",
"mime-types": "^2.1.35",
"qs": "^6.12.1",
"type-fest": "^4.20.1",
"undici": "^6.19.2",
"undici": "^7.0.0",
"ylru": "^2.0.0"
},
"devDependencies": {
Expand All @@ -68,6 +69,7 @@
"cross-env": "^7.0.3",
"eslint": "8",
"eslint-config-egg": "14",
"https-pem": "^3.0.0",
"iconv-lite": "^0.6.3",
"proxy": "^1.0.2",
"selfsigned": "^2.0.1",
Expand Down
2 changes: 1 addition & 1 deletion src/FetchOpaqueInterceptor.ts
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ export interface OpaqueInterceptorOptions {
export function fetchOpaqueInterceptor(opts: OpaqueInterceptorOptions) {
const opaqueLocalStorage = opts?.opaqueLocalStorage;
return (dispatch: Dispatcher['dispatch']): Dispatcher['dispatch'] => {
return function redirectInterceptor(opts: Dispatcher.DispatchOptions, handler: Dispatcher.DispatchHandlers) {
return function redirectInterceptor(opts: Dispatcher.DispatchOptions, handler: Dispatcher.DispatchHandler) {
const opaque = opaqueLocalStorage?.getStore();
(handler as any).opaque = opaque;
return dispatch(opts, handler);
Expand Down
32 changes: 32 additions & 0 deletions src/FormData.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
import path from 'node:path';
import _FormData from 'form-data';

export class FormData extends _FormData {
_getContentDisposition(value: any, options: any) {
// support non-ascii filename
// https://github.com/form-data/form-data/pull/571
Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

let filename;
let contentDisposition;

if (typeof options.filepath === 'string') {
// custom filepath for relative paths
filename = path.normalize(options.filepath).replace(/\\/g, '/');
} else if (options.filename || value.name || value.path) {
// custom filename take precedence
// formidable and the browser add a name property
// fs- and request- streams have path property
filename = path.basename(options.filename || value.name || value.path);
} else if (value.readable && value.hasOwnProperty('httpVersion')) {
// or try http response
filename = path.basename(value.client._httpMessage.path || '');
Comment thread
fengmk2 marked this conversation as resolved.
}

if (filename) {
// https://datatracker.ietf.org/doc/html/rfc6266#section-4.1
// support non-ascii filename
contentDisposition = 'filename="' + filename + '"; filename*=UTF-8\'\'' + encodeURIComponent(filename);
}
Comment thread
fengmk2 marked this conversation as resolved.

return contentDisposition;
}
}
2 changes: 1 addition & 1 deletion src/HttpAgent.ts
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ export class HttpAgent extends Agent {
this.#checkAddress = options.checkAddress;
}

dispatch(options: Agent.DispatchOptions, handler: Dispatcher.DispatchHandlers): boolean {
dispatch(options: Agent.DispatchOptions, handler: Dispatcher.DispatchHandler): boolean {
if (this.#checkAddress && options.origin) {
const originUrl = typeof options.origin === 'string' ? new URL(options.origin) : options.origin;
let hostname = originUrl.hostname;
Expand Down
64 changes: 27 additions & 37 deletions src/HttpClient.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@ import {
gunzipSync,
brotliDecompressSync,
} from 'node:zlib';
import { Blob } from 'node:buffer';
import { Readable, pipeline } from 'node:stream';
import { pipeline as pipelinePromise } from 'node:stream/promises';
import { basename } from 'node:path';
Expand All @@ -19,13 +18,13 @@ import { performance } from 'node:perf_hooks';
import querystring from 'node:querystring';
import { setTimeout as sleep } from 'node:timers/promises';
import {
FormData,
request as undiciRequest,
Dispatcher,
Agent,
getGlobalDispatcher,
Pool,
} from 'undici';
import { FormData } from './FormData.js';
// eslint-disable-next-line @typescript-eslint/ban-ts-comment
// @ts-ignore
import undiciSymbols from 'undici/lib/core/symbols.js';
Expand All @@ -49,7 +48,7 @@ type IUndiciRequestOption = PropertyShouldBe<UndiciRequestOption, 'headers', Inc

export const PROTO_RE = /^https?:\/\//i;

export interface UnidiciTimingInfo {
export interface UndiciTimingInfo {
startTime: number;
redirectStartTime: number;
redirectEndTime: number;
Expand All @@ -70,6 +69,9 @@ export interface UnidiciTimingInfo {
};
}

// keep typo compatibility
export interface UnidiciTimingInfo extends UndiciTimingInfo {}

function noop() {
// noop
}
Expand Down Expand Up @@ -113,28 +115,6 @@ export type ClientOptions = {
},
};

// https://github.com/octet-stream/form-data
class BlobFromStream {
#stream;
#type;
constructor(stream: Readable, type: string) {
this.#stream = stream;
this.#type = type;
}

stream() {
return this.#stream;
}

get type(): string {
return this.#type;
}

get [Symbol.toStringTag]() {
return 'Blob';
}
}

export const VERSION = 'VERSION';
// 'node-urllib/4.0.0 Node.js/18.19.0 (darwin; x64)'
export const HEADER_USER_AGENT =
Expand Down Expand Up @@ -487,21 +467,28 @@ export class HttpClient extends EventEmitter {
}
}
for (const [ index, [ field, file, customFileName ]] of uploadFiles.entries()) {
let fileName = '';
let value: any;
if (typeof file === 'string') {
// FIXME: support non-ascii filename
// const fileName = encodeURIComponent(basename(file));
// formData.append(field, await fileFromPath(file, `utf-8''${fileName}`, { type: mime.lookup(fileName) || '' }));
const fileName = basename(file);
const fileReadable = createReadStream(file);
formData.append(field, new BlobFromStream(fileReadable, mime.lookup(fileName) || ''), fileName);
fileName = basename(file);
value = createReadStream(file);
} else if (Buffer.isBuffer(file)) {
formData.append(field, new Blob([ file ]), customFileName || `bufferfile${index}`);
fileName = customFileName || `bufferfile${index}`;
value = file;
} else if (file instanceof Readable || isReadable(file as any)) {
const fileName = getFileName(file) || customFileName || `streamfile${index}`;
formData.append(field, new BlobFromStream(file, mime.lookup(fileName) || ''), fileName);
fileName = getFileName(file) || customFileName || `streamfile${index}`;
isStreamingRequest = true;
value = file;
}
const mimeType = mime.lookup(fileName) || '';
formData.append(field, value, {
filename: fileName,
contentType: mimeType,
});
debug('formData append field: %s, mimeType: %s, fileName: %s',
field, mimeType, fileName);
Comment thread
fengmk2 marked this conversation as resolved.
}
Object.assign(headers, formData.getHeaders());
requestOptions.body = formData;
} else if (args.content) {
if (!isGETOrHEAD) {
Expand Down Expand Up @@ -558,8 +545,8 @@ export class HttpClient extends EventEmitter {
args.socketErrorRetry = 0;
}

debug('Request#%d %s %s, headers: %j, headersTimeout: %s, bodyTimeout: %s, isStreamingRequest: %s',
requestId, requestOptions.method, requestUrl.href, headers, headersTimeout, bodyTimeout, isStreamingRequest);
debug('Request#%d %s %s, headers: %j, headersTimeout: %s, bodyTimeout: %s, isStreamingRequest: %s, maxRedirections: %s',
requestId, requestOptions.method, requestUrl.href, headers, headersTimeout, bodyTimeout, isStreamingRequest, requestOptions.maxRedirections);
requestOptions.headers = headers;
channels.request.publish({
request: reqMeta,
Expand Down Expand Up @@ -701,7 +688,8 @@ export class HttpClient extends EventEmitter {

return clientResponse;
} catch (rawError: any) {
debug('Request#%d throw error: %s', requestId, rawError);
debug('Request#%d throw error: %s, socketErrorRetry: %s, socketErrorRetries: %s',
requestId, rawError, args.socketErrorRetry, requestContext.socketErrorRetries);
Comment thread
fengmk2 marked this conversation as resolved.
Outdated
let err = rawError;
if (err.name === 'HeadersTimeoutError') {
err = new HttpClientRequestTimeoutError(headersTimeout, { cause: err });
Expand All @@ -713,6 +701,8 @@ export class HttpClient extends EventEmitter {
// auto retry on socket error, https://github.com/node-modules/urllib/issues/454
if (args.socketErrorRetry > 0 && requestContext.socketErrorRetries < args.socketErrorRetry) {
requestContext.socketErrorRetries++;
debug('Request#%d retry on socket error, socketErrorRetries: %d',
requestId, requestContext.socketErrorRetries);
return await this.#requestInternal(url, options, requestContext);
}
}
Expand Down
16 changes: 13 additions & 3 deletions src/diagnosticsChannel.ts
Original file line number Diff line number Diff line change
Expand Up @@ -143,7 +143,10 @@ export function initDiagnosticsChannel() {
subscribe('undici:client:sendHeaders', (message, name) => {
const { request, socket } = message as DiagnosticsChannel.ClientSendHeadersMessage & { socket: SocketExtend };
const opaque = getRequestOpaque(request, kHandler);
if (!opaque || !opaque[symbols.kRequestId]) return;
if (!opaque || !opaque[symbols.kRequestId]) {
debug('[%s] opaque not found', name);
return;
}

(socket[symbols.kHandledRequests] as number)++;
// attach socket to opaque
Expand All @@ -165,7 +168,10 @@ export function initDiagnosticsChannel() {
subscribe('undici:request:bodySent', (message, name) => {
const { request } = message as DiagnosticsChannel.RequestBodySentMessage;
const opaque = getRequestOpaque(request, kHandler);
if (!opaque || !opaque[symbols.kRequestId]) return;
if (!opaque || !opaque[symbols.kRequestId]) {
debug('[%s] opaque not found', name);
return;
}

debug('[%s] Request#%d send body', name, opaque[symbols.kRequestId]);
if (!opaque[symbols.kEnableRequestTiming]) return;
Expand All @@ -176,7 +182,10 @@ export function initDiagnosticsChannel() {
subscribe('undici:request:headers', (message, name) => {
const { request, response } = message as DiagnosticsChannel.RequestHeadersMessage;
const opaque = getRequestOpaque(request, kHandler);
if (!opaque || !opaque[symbols.kRequestId]) return;
if (!opaque || !opaque[symbols.kRequestId]) {
debug('[%s] opaque not found', name);
return;
}

// get socket from opaque
const socket = opaque[symbols.kRequestSocket];
Expand All @@ -199,6 +208,7 @@ export function initDiagnosticsChannel() {
const { request } = message as DiagnosticsChannel.RequestTrailersMessage;
const opaque = getRequestOpaque(request, kHandler);
if (!opaque || !opaque[symbols.kRequestId]) {
debug('[%s] opaque not found', name);
return;
}

Expand Down
Loading