Skip to content
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
Show all changes
26 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
103 changes: 92 additions & 11 deletions lib/internal/streams/pipeline.js
Original file line number Diff line number Diff line change
Expand Up @@ -35,12 +35,22 @@ const {
isReadable,
isReadableNodeStream,
isNodeStream,
isWritableStream,
isTransformStream,
isWebStream,
} = require('internal/streams/utils');
const { AbortController } = require('internal/abort_controller');

let PassThrough;
let Readable;

function lazyloadReadable() {
if (!Readable) {
Readable = require('internal/streams/readable');
}
return Readable;
}

function destroyer(stream, reading, writing) {
let finished = false;
stream.on('close', () => {
Expand Down Expand Up @@ -81,17 +91,17 @@ function makeAsyncIterable(val) {
}

async function* fromReadable(val) {
if (!Readable) {
Readable = require('internal/streams/readable');
}

yield* Readable.prototype[SymbolAsyncIterator].call(val);
yield* lazyloadReadable().prototype[SymbolAsyncIterator].call(val);
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Unnecessary change

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok reverting this

}

async function pump(iterable, writable, finish, { end }) {
let error;
let onresolve = null;

if (isTransformStream(writable)) {
writable = writable.writable;
}

const resume = (err) => {
if (err) {
error = err;
Expand All @@ -118,22 +128,34 @@ async function pump(iterable, writable, finish, { end }) {
}
});

writable.on('drain', resume);
if (typeof writable.on === 'function') {
writable.on('drain', resume);
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is for node streams... not web streams

}
const cleanup = eos(writable, { readable: false }, resume);

try {
if (writable.writableNeedDrain) {
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is for node streams... not web streams

await wait();
}

let writer = writable;
let endFn = () => {};
if (isWritableStream(writable)) {
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shouldn't this always be true here?

writer = writable.getWriter();
endFn = writer.close.bind(writer);
} else {
endFn = writer.end.bind(writer);
}

for await (const chunk of iterable) {
if (!writable.write(chunk)) {
const written = writer.write(chunk);
if (!written) {
await wait();
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How and when does this wait resume?

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

hmm yes this part is wrong writer.write will be promise
mistake 😅😅

}
}

if (end) {
writable.end();
endFn();
}

await wait();
Comment thread
debadree25 marked this conversation as resolved.
Outdated
Expand All @@ -143,7 +165,9 @@ async function pump(iterable, writable, finish, { end }) {
finish(error !== err ? aggregateTwoErrors(error, err) : err);
} finally {
cleanup();
writable.off('drain', resume);
if (typeof writable.off === 'function') {
writable.off('drain', resume);
}
}
}
Comment thread
debadree25 marked this conversation as resolved.

Expand Down Expand Up @@ -259,8 +283,13 @@ function pipelineImpl(streams, callback, opts) {
ret = Duplex.from(stream);
}
} else if (typeof stream === 'function') {
ret = makeAsyncIterable(ret);
ret = stream(ret, { signal });
if (isTransformStream(ret)) {
ret = makeAsyncIterable(ret?.readable);
ret = stream(ret, { signal });
} else {
ret = makeAsyncIterable(ret);
ret = stream(ret, { signal });
}

if (reading) {
if (!isIterable(ret, true)) {
Expand Down Expand Up @@ -327,6 +356,23 @@ function pipelineImpl(streams, callback, opts) {
} else if (isIterable(ret)) {
finishCount++;
pump(ret, stream, finish, { end });
} else if (isTransformStream(ret)) {
finishCount++;
pump(ret.readable, stream, finish, { end });
} else {
throw new ERR_INVALID_ARG_TYPE(
'val', ['Readable', 'Iterable', 'AsyncIterable'], ret);
}
ret = stream;
} else if (isWebStream(stream)) {
if (isReadableNodeStream(ret)) {
finishCount += 2;
pipeNodeToWeb(ret, stream, finish, { end });
Comment thread
debadree25 marked this conversation as resolved.
Outdated
} else if (isIterable(ret)) {
finishCount++;
pump(ret, stream, finish, { end });
} else if (isTransformStream(ret)) {
pump(ret.readable, stream, finish, { end });
} else {
throw new ERR_INVALID_ARG_TYPE(
'val', ['Readable', 'Iterable', 'AsyncIterable'], ret);
Expand Down Expand Up @@ -392,4 +438,39 @@ function pipe(src, dst, finish, { end }) {
return eos(dst, { readable: false, writable: true }, finish);
}

function pipeNodeToWeb(src, dst, finish, { end }) {
let writable = dst;
if (isTransformStream(dst)) {
writable = dst.writable;
}
const writer = writable.getWriter();

src.on('data', (chunk) => {
writer.write(chunk);
});

if (end) {
src.once('end', () => {
writer.close();
});
} else {
finish();
}
eos(src, { readable: true, writable: false }, (err) => {
const rState = src._readableState;
if (
err &&
err.code === 'ERR_STREAM_PREMATURE_CLOSE' &&
(rState && rState.ended && !rState.errored && !rState.errorEmitted)
) {
src
.once('end', finish)
.once('error', finish);
} else {
finish(err);
}
});
return eos(writable, finish);
}

module.exports = { pipelineImpl, pipeline };
15 changes: 15 additions & 0 deletions lib/internal/streams/utils.js
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,19 @@ function isWritableStream(obj) {
);
}

function isTransformStream(obj) {
return !!(
obj &&
!isNodeStream(obj) &&
typeof obj.readable === 'object' &&
typeof obj.writable === 'object'
);
}

function isWebStream(obj) {
return isReadableStream(obj) || isWritableStream(obj) || isTransformStream(obj);
}

function isIterable(obj, isAsync) {
if (obj == null) return false;
if (isAsync === true) return typeof obj[SymbolAsyncIterator] === 'function';
Expand Down Expand Up @@ -303,6 +316,7 @@ module.exports = {
isReadableFinished,
isReadableErrored,
isNodeStream,
isWebStream,
isWritable,
isWritableNodeStream,
isWritableStream,
Expand All @@ -312,4 +326,5 @@ module.exports = {
isServerRequest,
isServerResponse,
willEmitClose,
isTransformStream,
};
3 changes: 2 additions & 1 deletion lib/stream/promises.js
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ const {
const {
isIterable,
isNodeStream,
isWebStream,
} = require('internal/streams/utils');

const { pipelineImpl: pl } = require('internal/streams/pipeline');
Expand All @@ -21,7 +22,7 @@ function pipeline(...streams) {
let end;
const lastArg = streams[streams.length - 1];
if (lastArg && typeof lastArg === 'object' &&
!isNodeStream(lastArg) && !isIterable(lastArg)) {
!isNodeStream(lastArg) && !isIterable(lastArg) && !isWebStream(lastArg)) {
const options = ArrayPrototypePop(streams);
signal = options.signal;
end = options.end;
Expand Down
Loading