Skip to content

Commit 4709a48

Browse files
authored
fix: added fix for memory leak in sockets (#10788)
1 parent be33360 commit 4709a48

2 files changed

Lines changed: 336 additions & 18 deletions

File tree

lib/adapters/http.js

Lines changed: 24 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,11 @@ const { http: httpFollow, https: httpsFollow } = followRedirects;
4848

4949
const isHttps = /https:?/;
5050

51+
// Symbols used to bind a single 'error' listener to a pooled socket and track
52+
// the request currently owning that socket across keep-alive reuse (issue #10780).
53+
const kAxiosSocketListener = Symbol('axios.http.socketListener');
54+
const kAxiosCurrentReq = Symbol('axios.http.currentReq');
55+
5156
const supportedProtocols = platform.protocols.map((protocol) => {
5257
return protocol + ':';
5358
});
@@ -943,20 +948,28 @@ export default isHttpAdapterSupported &&
943948
// default interval of sending ack packet is 1 minute
944949
socket.setKeepAlive(true, 1000 * 60);
945950

946-
const removeSocketErrorListener = () => {
947-
socket.removeListener('error', handleRequestSocketError);
948-
};
951+
// Install a single 'error' listener per socket (not per request) to avoid
952+
// accumulating listeners on pooled keep-alive sockets that get reassigned
953+
// to new requests before the previous request's 'close' fires (issue #10780).
954+
// The listener is bound to the socket's currently-active request via a
955+
// symbol, which is swapped as the socket is reassigned.
956+
if (!socket[kAxiosSocketListener]) {
957+
socket.on('error', function handleSocketError(err) {
958+
const current = socket[kAxiosCurrentReq];
959+
if (current && !current.destroyed) {
960+
current.destroy(err);
961+
}
962+
});
963+
socket[kAxiosSocketListener] = true;
964+
}
949965

950-
function handleRequestSocketError(err) {
951-
removeSocketErrorListener();
966+
socket[kAxiosCurrentReq] = req;
952967

953-
if (!req.destroyed) {
954-
req.destroy(err);
968+
req.once('close', function clearCurrentReq() {
969+
if (socket[kAxiosCurrentReq] === req) {
970+
socket[kAxiosCurrentReq] = null;
955971
}
956-
}
957-
958-
socket.on('error', handleRequestSocketError);
959-
req.once('close', removeSocketErrorListener);
972+
});
960973
});
961974

962975
// Handle request timeout

tests/unit/adapters/http.test.js

Lines changed: 312 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -4079,6 +4079,120 @@ describe('supports http with nodejs', () => {
40794079
});
40804080

40814081
describe('keep-alive', () => {
4082+
it('should not emit MaxListenersExceededWarning under concurrent requests through a pooled keep-alive agent (regression #10780)', async () => {
4083+
const server = await startHTTPServer(
4084+
(req, res) => {
4085+
// Small delay forces concurrent requests to queue on the single pooled socket.
4086+
setTimeout(() => {
4087+
res.writeHead(200, { 'Content-Type': 'text/plain' });
4088+
res.end('ok');
4089+
}, 5);
4090+
},
4091+
{ port: SERVER_PORT }
4092+
);
4093+
4094+
const warnings = [];
4095+
const warningHandler = (warning) => {
4096+
if (warning && warning.name === 'MaxListenersExceededWarning') {
4097+
warnings.push(warning);
4098+
}
4099+
};
4100+
process.on('warning', warningHandler);
4101+
4102+
const agent = new http.Agent({ keepAlive: true, maxSockets: 1 });
4103+
4104+
try {
4105+
const baseURL = `http://localhost:${server.address().port}`;
4106+
const CONCURRENCY = 30;
4107+
4108+
const results = await Promise.all(
4109+
Array.from({ length: CONCURRENCY }, (_, i) =>
4110+
axios.get(`/req-${i}`, { baseURL, httpAgent: agent })
4111+
)
4112+
);
4113+
4114+
assert.strictEqual(results.length, CONCURRENCY);
4115+
for (const r of results) {
4116+
assert.strictEqual(r.status, 200);
4117+
assert.strictEqual(r.data, 'ok');
4118+
}
4119+
4120+
// Allow any deferred process 'warning' emissions to flush.
4121+
await setTimeoutAsync(50);
4122+
4123+
assert.strictEqual(
4124+
warnings.length,
4125+
0,
4126+
`expected no MaxListenersExceededWarning, got ${warnings.length}: ${warnings.map((w) => w.message).join('; ')}`
4127+
);
4128+
4129+
// Inspect live sockets on the agent: none should have more than one
4130+
// axios-installed error listener, regardless of how many requests ran.
4131+
const allSockets = []
4132+
.concat(...Object.values(agent.sockets || {}))
4133+
.concat(...Object.values(agent.freeSockets || {}));
4134+
for (const sock of allSockets) {
4135+
assert.ok(
4136+
sock.listenerCount('error') <= 2,
4137+
`socket should have at most a couple of error listeners (agent + axios), got ${sock.listenerCount('error')}`
4138+
);
4139+
}
4140+
} finally {
4141+
process.removeListener('warning', warningHandler);
4142+
agent.destroy();
4143+
await stopHTTPServer(server);
4144+
}
4145+
}, 30000);
4146+
4147+
it('should not leak memory via retained request closures under a long burst of keep-alive requests (regression #10780)', async () => {
4148+
// This guards against stage88's report of OOM at ~480k sequential requests:
4149+
// if the per-request closure leaked, heap would grow linearly. We simulate
4150+
// a shorter burst and verify retained closures are released (via WeakRef
4151+
// reachability check after GC, if exposed).
4152+
if (typeof global.gc !== 'function') {
4153+
// Skip when GC is not exposed (run with `node --expose-gc`).
4154+
return;
4155+
}
4156+
4157+
const server = await startHTTPServer(
4158+
(req, res) => {
4159+
res.writeHead(200);
4160+
res.end('ok');
4161+
},
4162+
{ port: SERVER_PORT }
4163+
);
4164+
4165+
const agent = new http.Agent({ keepAlive: true, maxSockets: 4 });
4166+
4167+
try {
4168+
const baseURL = `http://localhost:${server.address().port}`;
4169+
4170+
const refs = [];
4171+
for (let i = 0; i < 200; i += 1) {
4172+
// eslint-disable-next-line no-await-in-loop
4173+
const response = await axios.get('/', { baseURL, httpAgent: agent });
4174+
refs.push(new WeakRef(response.request));
4175+
}
4176+
4177+
// Drop strong refs and force GC.
4178+
global.gc();
4179+
await setTimeoutAsync(10);
4180+
global.gc();
4181+
4182+
const retained = refs.filter((r) => r.deref() !== undefined).length;
4183+
// Some trailing requests may still be referenced in internal buffers.
4184+
// The fix's correctness: retained count scales with agent socket count,
4185+
// NOT with request count. A pre-fix leak would keep >>socket count.
4186+
assert.ok(
4187+
retained <= 20,
4188+
`expected most request objects to be collectible after GC; ${retained}/200 retained suggests a closure leak`
4189+
);
4190+
} finally {
4191+
agent.destroy();
4192+
await stopHTTPServer(server);
4193+
}
4194+
}, 30000);
4195+
40824196
it('should not fail with "socket hang up" when using timeouts', async () => {
40834197
const server = await startHTTPServer(
40844198
async (req, res) => {
@@ -4100,7 +4214,7 @@ describe('supports http with nodejs', () => {
41004214
}
41014215
}, 15000);
41024216

4103-
it('should remove request socket error listeners after keep-alive requests close', async () => {
4217+
it('should install at most one socket error listener across reused keep-alive sockets', async () => {
41044218
const noop = () => {};
41054219
const socket = new EventEmitter();
41064220
socket.setKeepAlive = noop;
@@ -4146,19 +4260,210 @@ describe('supports http with nodejs', () => {
41464260
},
41474261
};
41484262

4263+
// First request: axios installs its single per-socket listener.
41494264
await axios.get('http://example.com/first', {
41504265
transport,
41514266
maxRedirects: 0,
41524267
});
41534268
await setTimeoutAsync(0);
4154-
assert.strictEqual(socket.listenerCount('error'), baseErrorListenerCount);
4269+
assert.strictEqual(
4270+
socket.listenerCount('error'),
4271+
baseErrorListenerCount + 1,
4272+
'axios should install exactly one socket error listener'
4273+
);
41554274

4156-
await axios.get('http://example.com/second', {
4157-
transport,
4158-
maxRedirects: 0,
4159-
});
4275+
// Many subsequent requests reusing the same socket must not add more listeners.
4276+
for (let i = 0; i < 20; i += 1) {
4277+
// eslint-disable-next-line no-await-in-loop
4278+
await axios.get(`http://example.com/next-${i}`, {
4279+
transport,
4280+
maxRedirects: 0,
4281+
});
4282+
// eslint-disable-next-line no-await-in-loop
4283+
await setTimeoutAsync(0);
4284+
assert.strictEqual(
4285+
socket.listenerCount('error'),
4286+
baseErrorListenerCount + 1,
4287+
'listener count must stay constant across keep-alive reuse'
4288+
);
4289+
}
4290+
});
4291+
4292+
it('should not accumulate socket error listeners when a pooled socket is reassigned before the previous request closes (regression #10780)', async () => {
4293+
const noop = () => {};
4294+
const socket = new EventEmitter();
4295+
socket.setKeepAlive = noop;
4296+
socket.on('error', noop);
4297+
4298+
const baseErrorListenerCount = socket.listenerCount('error');
4299+
4300+
// Each request defers its 'close' emission so that the socket is
4301+
// reassigned to the next request before the previous one closes.
4302+
// This reproduces the race condition described in #10780.
4303+
const pendingRequests = [];
4304+
4305+
const transport = {
4306+
request(_, cb) {
4307+
const req = new (class MockRequest extends EventEmitter {
4308+
constructor() {
4309+
super();
4310+
this.destroyed = false;
4311+
}
4312+
4313+
setTimeout() {}
4314+
write() {}
4315+
4316+
end() {
4317+
// Share the single pooled socket across every request.
4318+
this.emit('socket', socket);
4319+
4320+
setImmediate(() => {
4321+
const response = stream.Readable.from(['ok']);
4322+
response.statusCode = 200;
4323+
response.headers = {};
4324+
cb(response);
4325+
// Intentionally do NOT emit 'close' yet. Collect the req
4326+
// so close can be emitted later, after other reqs have
4327+
// already claimed the socket.
4328+
pendingRequests.push(this);
4329+
});
4330+
}
4331+
4332+
destroy(err) {
4333+
if (this.destroyed) return;
4334+
this.destroyed = true;
4335+
err && this.emit('error', err);
4336+
this.emit('close');
4337+
}
4338+
})();
4339+
4340+
return req;
4341+
},
4342+
};
4343+
4344+
const results = await Promise.all(
4345+
Array.from({ length: 20 }, (_, i) =>
4346+
axios.get(`http://example.com/concurrent-${i}`, {
4347+
transport,
4348+
maxRedirects: 0,
4349+
})
4350+
)
4351+
);
4352+
4353+
assert.strictEqual(results.length, 20);
4354+
4355+
// Critical assertion: despite 20 concurrent requests all claiming the
4356+
// same pooled socket before any emitted 'close', only ONE axios listener
4357+
// must be attached. This is the difference between the pre-fix
4358+
// behaviour (20 listeners, MaxListenersExceededWarning) and the fix.
4359+
assert.strictEqual(
4360+
socket.listenerCount('error'),
4361+
baseErrorListenerCount + 1,
4362+
`expected a single axios socket error listener under concurrent reuse, got ${socket.listenerCount('error') - baseErrorListenerCount}`
4363+
);
4364+
4365+
// Now drain the queued close events. Listener count must still be 1.
4366+
for (const req of pendingRequests) {
4367+
req.emit('close');
4368+
}
41604369
await setTimeoutAsync(0);
4161-
assert.strictEqual(socket.listenerCount('error'), baseErrorListenerCount);
4370+
4371+
assert.strictEqual(
4372+
socket.listenerCount('error'),
4373+
baseErrorListenerCount + 1,
4374+
'listener must persist on the socket after requests close (cleanup is per-request ownership, not per-listener removal)'
4375+
);
4376+
});
4377+
4378+
it('should route a socket error to the currently-active request after the socket has been reassigned', async () => {
4379+
const noop = () => {};
4380+
const socket = new EventEmitter();
4381+
socket.setKeepAlive = noop;
4382+
socket.on('error', noop);
4383+
4384+
const createdReqs = [];
4385+
4386+
// First transport: completes cleanly (emits response then close).
4387+
const cleanTransport = {
4388+
request(_, cb) {
4389+
const emitter = new (class MockRequest extends EventEmitter {
4390+
constructor() {
4391+
super();
4392+
this.destroyed = false;
4393+
createdReqs.push(this);
4394+
}
4395+
setTimeout() {}
4396+
write() {}
4397+
end() {
4398+
this.emit('socket', socket);
4399+
setImmediate(() => {
4400+
const response = stream.Readable.from(['ok']);
4401+
response.statusCode = 200;
4402+
response.headers = {};
4403+
cb(response);
4404+
this.emit('close');
4405+
});
4406+
}
4407+
destroy(err) {
4408+
if (this.destroyed) return;
4409+
this.destroyed = true;
4410+
err && this.emit('error', err);
4411+
this.emit('close');
4412+
}
4413+
})();
4414+
return emitter;
4415+
},
4416+
};
4417+
4418+
// Second transport: emits socket error instead of a response.
4419+
const errorTransport = {
4420+
request() {
4421+
const emitter = new (class MockRequest extends EventEmitter {
4422+
constructor() {
4423+
super();
4424+
this.destroyed = false;
4425+
createdReqs.push(this);
4426+
}
4427+
setTimeout() {}
4428+
write() {}
4429+
end() {
4430+
this.emit('socket', socket);
4431+
setImmediate(() => {
4432+
socket.emit('error', Object.assign(new Error('boom'), { code: 'EPIPE' }));
4433+
});
4434+
}
4435+
destroy(err) {
4436+
if (this.destroyed) return;
4437+
this.destroyed = true;
4438+
err && this.emit('error', err);
4439+
this.emit('close');
4440+
}
4441+
})();
4442+
return emitter;
4443+
},
4444+
};
4445+
4446+
// First request completes successfully; socket is released.
4447+
await axios.get('http://example.com/first', { transport: cleanTransport, maxRedirects: 0 });
4448+
await setTimeoutAsync(0);
4449+
4450+
const firstReq = createdReqs[0];
4451+
assert.ok(firstReq && firstReq.destroyed === false, 'first request must not have been destroyed by a socket error');
4452+
4453+
// Stray socket error after first req has closed: must not destroy firstReq.
4454+
socket.emit('error', new Error('stray error after close'));
4455+
assert.strictEqual(firstReq.destroyed, false, 'socket error after close must not destroy the old request');
4456+
4457+
// Second request claims the socket, then its socket errors. It should reject.
4458+
const err = await axios
4459+
.get('http://example.com/second', { transport: errorTransport, maxRedirects: 0 })
4460+
.catch((e) => e);
4461+
4462+
assert.ok(err instanceof AxiosError, 'second request should reject with an AxiosError');
4463+
assert.strictEqual(err.code, 'EPIPE');
4464+
4465+
const secondReq = createdReqs[1];
4466+
assert.strictEqual(secondReq.destroyed, true, 'second request should be destroyed by its own active socket error');
41624467
});
41634468
});
41644469

0 commit comments

Comments
 (0)