Skip to content
This repository was archived by the owner on Jul 21, 2023. It is now read-only.

Commit 41fefa4

Browse files
authored
fix: reset stream when over inbound stream limit (#193)
Instead of creating a stream or throwing an error, discard any NEW_STREAM messages that put us over the inbound stream limit and send a message to the initiator resetting that stream.
1 parent 7c39830 commit 41fefa4

File tree

2 files changed

+33
-15
lines changed

2 files changed

+33
-15
lines changed

src/mplex.ts

Lines changed: 20 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -147,27 +147,19 @@ export class MplexStreamMuxer implements StreamMuxer {
147147
}
148148

149149
_newStream (options: { id: number, name: string, type: 'initiator' | 'receiver', registry: Map<number, MplexStream> }) {
150-
if (this._streams.initiators.size === (this._init.maxOutboundStreams ?? MAX_STREAMS_OUTBOUND_STREAMS_PER_CONNECTION)) {
151-
throw errCode(new Error('To many outgoing streams open'), 'ERR_TOO_MANY_OUTBOUND_STREAMS')
152-
}
153-
154-
if (this._streams.receivers.size === (this._init.maxInboundStreams ?? MAX_STREAMS_INBOUND_STREAMS_PER_CONNECTION)) {
155-
throw errCode(new Error('To many incoming streams open'), 'ERR_TOO_MANY_INBOUND_STREAMS')
156-
}
157-
158150
const { id, name, type, registry } = options
159151

160152
log('new %s stream %s %s', type, id, name)
161153

154+
if (type === 'initiator' && this._streams.initiators.size === (this._init.maxOutboundStreams ?? MAX_STREAMS_OUTBOUND_STREAMS_PER_CONNECTION)) {
155+
throw errCode(new Error('Too many outbound streams open'), 'ERR_TOO_MANY_OUTBOUND_STREAMS')
156+
}
157+
162158
if (registry.has(id)) {
163159
throw new Error(`${type} stream ${id} already exists!`)
164160
}
165161

166162
const send = (msg: Message) => {
167-
if (!registry.has(id)) {
168-
throw errCode(new Error('the stream is not in the muxer registry, it may have already been closed'), 'ERR_STREAM_DOESNT_EXIST')
169-
}
170-
171163
if (log.enabled) {
172164
log.trace('%s stream %s send', type, id, printMessage(msg))
173165
}
@@ -180,7 +172,7 @@ export class MplexStreamMuxer implements StreamMuxer {
180172
}
181173

182174
const onEnd = () => {
183-
log('%s stream %s %s ended', type, id, name)
175+
log('%s stream %s ended', type, id, name)
184176
registry.delete(id)
185177

186178
if (this._init.onStreamEnd != null) {
@@ -257,6 +249,20 @@ export class MplexStreamMuxer implements StreamMuxer {
257249

258250
// Create a new stream?
259251
if (message.type === MessageTypes.NEW_STREAM) {
252+
if (this._streams.receivers.size === (this._init.maxInboundStreams ?? MAX_STREAMS_INBOUND_STREAMS_PER_CONNECTION)) {
253+
log.error('Too many inbound streams open')
254+
255+
// not going to allow this stream, send the reset message manually
256+
// instead of setting it up just to tear it down
257+
258+
this._source.push({
259+
id,
260+
type: MessageTypes.RESET_RECEIVER
261+
})
262+
263+
return
264+
}
265+
260266
const stream = this._newReceiverStream({ id, name: uint8ArrayToString(message.data instanceof Uint8Array ? message.data : message.data.slice()) })
261267

262268
if (this._init.onIncomingStream != null) {
@@ -288,7 +294,7 @@ export class MplexStreamMuxer implements StreamMuxer {
288294
})
289295

290296
// Inform the stream consumer they are not fast enough
291-
const error = errCode(new Error('Input buffer full - increase Mplex maxBufferSize to accomodate slow consumers'), 'ERR_STREAM_INPUT_BUFFER_FULL')
297+
const error = errCode(new Error('Input buffer full - increase Mplex maxBufferSize to accommodate slow consumers'), 'ERR_STREAM_INPUT_BUFFER_FULL')
292298
stream.abort(error)
293299

294300
return

test/mplex.spec.ts

Lines changed: 13 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -62,10 +62,22 @@ describe('mplex', () => {
6262
const data = uint8ArrayConcat(await all(encode(source)))
6363

6464
stream.push(data)
65+
stream.end()
66+
67+
const bufs: Uint8Array[] = []
68+
69+
void Promise.resolve().then(async () => {
70+
for await (const buf of muxer.source) {
71+
bufs.push(buf)
72+
}
73+
})
6574

6675
await muxer.sink(stream)
6776

68-
await expect(all(muxer.source)).to.eventually.be.rejected.with.property('code', 'ERR_TOO_MANY_INBOUND_STREAMS')
77+
const messages = await all(decode(bufs))
78+
79+
expect(messages).to.have.nested.property('[0][0].id', 11, 'Did not specify the correct stream id')
80+
expect(messages).to.have.nested.property('[0][0].type', MessageTypes.RESET_RECEIVER, 'Did not reset the stream that tipped us over the inbound stream limit')
6981
})
7082

7183
it('should reset a stream that fills the message buffer', async () => {

0 commit comments

Comments
 (0)