// Ported from https://github.com/mafintosh/pump with // permission from the author, Mathias Buus (@mafintosh). 'use strict' const abortControllerModule = require('../../../../../watcher/aborter/controller') const { ArrayIsArray, Promise, SymbolAsyncIterator } = require('../../ours/primordials') const eos = require('./end-of-stream') const { once } = require('../../ours/util') const destroyImpl = require('./destroy') const Duplex = require('./duplex') const { aggregateTwoErrors, codes: { ERR_INVALID_ARG_TYPE, ERR_INVALID_RETURN_VALUE, ERR_MISSING_ARGS, ERR_STREAM_DESTROYED }, AbortError } = require('../../ours/errors') const { validateFunction, validateAbortSignal } = require('../validators') const { isIterable, isReadable, isReadableNodeStream, isNodeStream } = require('./utils') const AbortController = globalThis.AbortController || abortControllerModule.AbortController let PassThrough let Readable function destroyer(stream, reading, writing) { let finished = false stream.on('close', () => { finished = true }) const cleanup = eos( stream, { readable: reading, writable: writing }, (err) => { finished = !err } ) return { destroy: (err) => { if (finished) return finished = true destroyImpl.destroyer(stream, err || new ERR_STREAM_DESTROYED('pipe')) }, cleanup } } function popCallback(streams) { // Streams should never be an empty array. It should always contain at least // a single stream. Therefore optimize for the average case instead of // checking for length === 0 as well. validateFunction(streams[streams.length - 1], 'streams[stream.length - 1]') return streams.pop() } function makeAsyncIterable(val) { if (isIterable(val)) { return val } else if (isReadableNodeStream(val)) { // Legacy streams are not Iterable. return fromReadable(val) } throw new ERR_INVALID_ARG_TYPE('val', ['Readable', 'Iterable', 'AsyncIterable'], val) } async function* fromReadable(val) { if (!Readable) { Readable = require('./readable') } yield* Readable.prototype[SymbolAsyncIterator].call(val) } async function pump(iterable, writable, finish, { end }) { let error let onresolve = null const resume = (err) => { if (err) { error = err } if (onresolve) { const callback = onresolve onresolve = null callback() } } const wait = () => new Promise((resolve, reject) => { if (error) { reject(error) } else { onresolve = () => { if (error) { reject(error) } else { resolve() } } } }) writable.on('drain', resume) const cleanup = eos( writable, { readable: false }, resume ) try { if (writable.writableNeedDrain) { await wait() } for await (const chunk of iterable) { if (!writable.write(chunk)) { await wait() } } if (end) { writable.end() } await wait() finish() } catch (err) { finish(error !== err ? aggregateTwoErrors(error, err) : err) } finally { cleanup() writable.off('drain', resume) } } function pipeline(...streams) { return pipelineImpl(streams, once(popCallback(streams))) } function pipelineImpl(streams, callback, opts) { if (streams.length === 1 && ArrayIsArray(streams[0])) { streams = streams[0] } if (streams.length < 2) { throw new ERR_MISSING_ARGS('streams') } const ac = new AbortController() const signal = ac.signal const outerSignal = opts === null || opts === undefined ? undefined : opts.signal // Need to cleanup event listeners if last stream is readable // https://github.com/nodejs/node/issues/35452 const lastStreamCleanup = [] validateAbortSignal(outerSignal, 'options.signal') function abort() { finishImpl(new AbortError()) } outerSignal === null || outerSignal === undefined ? undefined : outerSignal.addEventListener('abort', abort) let error let value const destroys = [] let finishCount = 0 function finish(err) { finishImpl(err, --finishCount === 0) } function finishImpl(err, final) { if (err && (!error || error.code === 'ERR_STREAM_PREMATURE_CLOSE')) { error = err } if (!error && !final) { return } while (destroys.length) { destroys.shift()(error) } outerSignal === null || outerSignal === undefined ? undefined : outerSignal.removeEventListener('abort', abort) ac.abort() if (final) { if (!error) { lastStreamCleanup.forEach((fn) => fn()) } process.nextTick(callback, error, value) } } let ret for (let i = 0; i < streams.length; i++) { const stream = streams[i] const reading = i < streams.length - 1 const writing = i > 0 const end = reading || (opts === null || opts === undefined ? undefined : opts.end) !== false const isLastStream = i === streams.length - 1 if (isNodeStream(stream)) { if (end) { const { destroy, cleanup } = destroyer(stream, reading, writing) destroys.push(destroy) if (isReadable(stream) && isLastStream) { lastStreamCleanup.push(cleanup) } } // Catch stream errors that occur after pipe/pump has completed. function onError(err) { if (err && err.name !== 'AbortError' && err.code !== 'ERR_STREAM_PREMATURE_CLOSE') { finish(err) } } stream.on('error', onError) if (isReadable(stream) && isLastStream) { lastStreamCleanup.push(() => { stream.removeListener('error', onError) }) } } if (i === 0) { if (typeof stream === 'function') { ret = stream({ signal }) if (!isIterable(ret)) { throw new ERR_INVALID_RETURN_VALUE('Iterable, AsyncIterable or Stream', 'source', ret) } } else if (isIterable(stream) || isReadableNodeStream(stream)) { ret = stream } else { ret = Duplex.from(stream) } } else if (typeof stream === 'function') { ret = makeAsyncIterable(ret) ret = stream(ret, { signal }) if (reading) { if (!isIterable(ret, true)) { throw new ERR_INVALID_RETURN_VALUE('AsyncIterable', `transform[${i - 1}]`, ret) } } else { var _ret if (!PassThrough) { PassThrough = require('./passthrough') } // If the last argument to pipeline is not a stream // we must create a proxy stream so that pipeline(...) // always returns a stream which can be further // composed through `.pipe(stream)`. const pt = new PassThrough({ objectMode: true }) // Handle Promises/A+ spec, `then` could be a getter that throws on // second use. const then = (_ret = ret) === null || _ret === undefined ? undefined : _ret.then if (typeof then === 'function') { finishCount++ then.call( ret, (val) => { value = val if (val != null) { pt.write(val) } if (end) { pt.end() } process.nextTick(finish) }, (err) => { pt.destroy(err) process.nextTick(finish, err) } ) } else if (isIterable(ret, true)) { finishCount++ pump(ret, pt, finish, { end }) } else { throw new ERR_INVALID_RETURN_VALUE('AsyncIterable or Promise', 'destination', ret) } ret = pt const { destroy, cleanup } = destroyer(ret, false, true) destroys.push(destroy) if (isLastStream) { lastStreamCleanup.push(cleanup) } } } else if (isNodeStream(stream)) { if (isReadableNodeStream(ret)) { finishCount += 2 const cleanup = pipe(ret, stream, finish, { end }) if (isReadable(stream) && isLastStream) { lastStreamCleanup.push(cleanup) } } else if (isIterable(ret)) { finishCount++ pump(ret, stream, finish, { end }) } else { throw new ERR_INVALID_ARG_TYPE('val', ['Readable', 'Iterable', 'AsyncIterable'], ret) } ret = stream } else { ret = Duplex.from(stream) } } if ( (signal !== null && signal !== undefined && signal.aborted) || (outerSignal !== null && outerSignal !== undefined && outerSignal.aborted) ) { process.nextTick(abort) } return ret } function pipe(src, dst, finish, { end }) { src.pipe(dst, { end }) if (end) { // Compat. Before node v10.12.0 stdio used to throw an error so // pipe() did/does not end() stdio destinations. // Now they allow it but "secretly" don't close the underlying fd. src.once('end', () => dst.end()) } else { finish() } eos( src, { readable: true, writable: false }, (err) => { const rState = src._readableState if ( err && err.code === 'ERR_STREAM_PREMATURE_CLOSE' && rState && rState.ended && !rState.errored && !rState.errorEmitted ) { // Some readable streams will emit 'close' before 'end'. However, since // this is on the readable side 'end' should still be emitted if the // stream has been ended and no error emitted. This should be allowed in // favor of backwards compatibility. Since the stream is piped to a // destination this should not result in any observable difference. // We don't need to check if this is a writable premature close since // eos will only fail with premature close on the reading side for // duplex streams. src.once('end', finish).once('error', finish) } else { finish(err) } } ) return eos( dst, { readable: false, writable: true }, finish ) } module.exports = { pipelineImpl, pipeline }