mirror of
				https://github.com/advplyr/audiobookshelf.git
				synced 2025-10-27 11:18:14 +01:00 
			
		
		
		
	
		
			
				
	
	
		
			423 lines
		
	
	
		
			8.9 KiB
		
	
	
	
		
			JavaScript
		
	
	
	
	
	
			
		
		
	
	
			423 lines
		
	
	
		
			8.9 KiB
		
	
	
	
		
			JavaScript
		
	
	
	
	
	
'use strict'
 | 
						|
 | 
						|
const abortControllerModule = require('../../../../../watcher/aborter/controller')
 | 
						|
 | 
						|
const bufferModule = require('buffer')
 | 
						|
 | 
						|
const {
 | 
						|
  isReadable,
 | 
						|
  isWritable,
 | 
						|
  isIterable,
 | 
						|
  isNodeStream,
 | 
						|
  isReadableNodeStream,
 | 
						|
  isWritableNodeStream,
 | 
						|
  isDuplexNodeStream
 | 
						|
} = require('./utils')
 | 
						|
 | 
						|
const eos = require('./end-of-stream')
 | 
						|
 | 
						|
const {
 | 
						|
  AbortError,
 | 
						|
  codes: { ERR_INVALID_ARG_TYPE, ERR_INVALID_RETURN_VALUE }
 | 
						|
} = require('../../ours/errors')
 | 
						|
 | 
						|
const { destroyer } = require('./destroy')
 | 
						|
 | 
						|
const Duplex = require('./duplex')
 | 
						|
 | 
						|
const Readable = require('./readable')
 | 
						|
 | 
						|
const { createDeferredPromise } = require('../../ours/util')
 | 
						|
 | 
						|
const from = require('./from')
 | 
						|
 | 
						|
const Blob = globalThis.Blob || bufferModule.Blob
 | 
						|
const isBlob =
 | 
						|
  typeof Blob !== 'undefined'
 | 
						|
    ? function isBlob(b) {
 | 
						|
      return b instanceof Blob
 | 
						|
    }
 | 
						|
    : function isBlob(b) {
 | 
						|
      return false
 | 
						|
    }
 | 
						|
const AbortController = globalThis.AbortController || abortControllerModule.AbortController
 | 
						|
 | 
						|
const { FunctionPrototypeCall } = require('../../ours/primordials') // This is needed for pre node 17.
 | 
						|
 | 
						|
class Duplexify extends Duplex {
 | 
						|
  constructor(options) {
 | 
						|
    super(options) // https://github.com/nodejs/node/pull/34385
 | 
						|
 | 
						|
    if ((options === null || options === undefined ? undefined : options.readable) === false) {
 | 
						|
      this._readableState.readable = false
 | 
						|
      this._readableState.ended = true
 | 
						|
      this._readableState.endEmitted = true
 | 
						|
    }
 | 
						|
 | 
						|
    if ((options === null || options === undefined ? undefined : options.writable) === false) {
 | 
						|
      this._writableState.writable = false
 | 
						|
      this._writableState.ending = true
 | 
						|
      this._writableState.ended = true
 | 
						|
      this._writableState.finished = true
 | 
						|
    }
 | 
						|
  }
 | 
						|
}
 | 
						|
 | 
						|
module.exports = function duplexify(body, name) {
 | 
						|
  if (isDuplexNodeStream(body)) {
 | 
						|
    return body
 | 
						|
  }
 | 
						|
 | 
						|
  if (isReadableNodeStream(body)) {
 | 
						|
    return _duplexify({
 | 
						|
      readable: body
 | 
						|
    })
 | 
						|
  }
 | 
						|
 | 
						|
  if (isWritableNodeStream(body)) {
 | 
						|
    return _duplexify({
 | 
						|
      writable: body
 | 
						|
    })
 | 
						|
  }
 | 
						|
 | 
						|
  if (isNodeStream(body)) {
 | 
						|
    return _duplexify({
 | 
						|
      writable: false,
 | 
						|
      readable: false
 | 
						|
    })
 | 
						|
  } // TODO: Webstreams
 | 
						|
  // if (isReadableStream(body)) {
 | 
						|
  //   return _duplexify({ readable: Readable.fromWeb(body) });
 | 
						|
  // }
 | 
						|
  // TODO: Webstreams
 | 
						|
  // if (isWritableStream(body)) {
 | 
						|
  //   return _duplexify({ writable: Writable.fromWeb(body) });
 | 
						|
  // }
 | 
						|
 | 
						|
  if (typeof body === 'function') {
 | 
						|
    const { value, write, final, destroy } = fromAsyncGen(body)
 | 
						|
 | 
						|
    if (isIterable(value)) {
 | 
						|
      return from(Duplexify, value, {
 | 
						|
        // TODO (ronag): highWaterMark?
 | 
						|
        objectMode: true,
 | 
						|
        write,
 | 
						|
        final,
 | 
						|
        destroy
 | 
						|
      })
 | 
						|
    }
 | 
						|
 | 
						|
    const then = value === null || value === undefined ? undefined : value.then
 | 
						|
 | 
						|
    if (typeof then === 'function') {
 | 
						|
      let d
 | 
						|
      const promise = FunctionPrototypeCall(
 | 
						|
        then,
 | 
						|
        value,
 | 
						|
        (val) => {
 | 
						|
          if (val != null) {
 | 
						|
            throw new ERR_INVALID_RETURN_VALUE('nully', 'body', val)
 | 
						|
          }
 | 
						|
        },
 | 
						|
        (err) => {
 | 
						|
          destroyer(d, err)
 | 
						|
        }
 | 
						|
      )
 | 
						|
      return (d = new Duplexify({
 | 
						|
        // TODO (ronag): highWaterMark?
 | 
						|
        objectMode: true,
 | 
						|
        readable: false,
 | 
						|
        write,
 | 
						|
 | 
						|
        final(cb) {
 | 
						|
          final(async () => {
 | 
						|
            try {
 | 
						|
              await promise
 | 
						|
              process.nextTick(cb, null)
 | 
						|
            } catch (err) {
 | 
						|
              process.nextTick(cb, err)
 | 
						|
            }
 | 
						|
          })
 | 
						|
        },
 | 
						|
 | 
						|
        destroy
 | 
						|
      }))
 | 
						|
    }
 | 
						|
 | 
						|
    throw new ERR_INVALID_RETURN_VALUE('Iterable, AsyncIterable or AsyncFunction', name, value)
 | 
						|
  }
 | 
						|
 | 
						|
  if (isBlob(body)) {
 | 
						|
    return duplexify(body.arrayBuffer())
 | 
						|
  }
 | 
						|
 | 
						|
  if (isIterable(body)) {
 | 
						|
    return from(Duplexify, body, {
 | 
						|
      // TODO (ronag): highWaterMark?
 | 
						|
      objectMode: true,
 | 
						|
      writable: false
 | 
						|
    })
 | 
						|
  } // TODO: Webstreams.
 | 
						|
  // if (
 | 
						|
  //   isReadableStream(body?.readable) &&
 | 
						|
  //   isWritableStream(body?.writable)
 | 
						|
  // ) {
 | 
						|
  //   return Duplexify.fromWeb(body);
 | 
						|
  // }
 | 
						|
 | 
						|
  if (
 | 
						|
    typeof (body === null || body === undefined ? undefined : body.writable) === 'object' ||
 | 
						|
    typeof (body === null || body === undefined ? undefined : body.readable) === 'object'
 | 
						|
  ) {
 | 
						|
    const readable =
 | 
						|
      body !== null && body !== undefined && body.readable
 | 
						|
        ? isReadableNodeStream(body === null || body === undefined ? undefined : body.readable)
 | 
						|
          ? body === null || body === undefined
 | 
						|
            ? undefined
 | 
						|
            : body.readable
 | 
						|
          : duplexify(body.readable)
 | 
						|
        : undefined
 | 
						|
    const writable =
 | 
						|
      body !== null && body !== undefined && body.writable
 | 
						|
        ? isWritableNodeStream(body === null || body === undefined ? undefined : body.writable)
 | 
						|
          ? body === null || body === undefined
 | 
						|
            ? undefined
 | 
						|
            : body.writable
 | 
						|
          : duplexify(body.writable)
 | 
						|
        : undefined
 | 
						|
    return _duplexify({
 | 
						|
      readable,
 | 
						|
      writable
 | 
						|
    })
 | 
						|
  }
 | 
						|
 | 
						|
  const then = body === null || body === undefined ? undefined : body.then
 | 
						|
 | 
						|
  if (typeof then === 'function') {
 | 
						|
    let d
 | 
						|
    FunctionPrototypeCall(
 | 
						|
      then,
 | 
						|
      body,
 | 
						|
      (val) => {
 | 
						|
        if (val != null) {
 | 
						|
          d.push(val)
 | 
						|
        }
 | 
						|
 | 
						|
        d.push(null)
 | 
						|
      },
 | 
						|
      (err) => {
 | 
						|
        destroyer(d, err)
 | 
						|
      }
 | 
						|
    )
 | 
						|
    return (d = new Duplexify({
 | 
						|
      objectMode: true,
 | 
						|
      writable: false,
 | 
						|
 | 
						|
      read() { }
 | 
						|
    }))
 | 
						|
  }
 | 
						|
 | 
						|
  throw new ERR_INVALID_ARG_TYPE(
 | 
						|
    name,
 | 
						|
    [
 | 
						|
      'Blob',
 | 
						|
      'ReadableStream',
 | 
						|
      'WritableStream',
 | 
						|
      'Stream',
 | 
						|
      'Iterable',
 | 
						|
      'AsyncIterable',
 | 
						|
      'Function',
 | 
						|
      '{ readable, writable } pair',
 | 
						|
      'Promise'
 | 
						|
    ],
 | 
						|
    body
 | 
						|
  )
 | 
						|
}
 | 
						|
 | 
						|
function fromAsyncGen(fn) {
 | 
						|
  let { promise, resolve } = createDeferredPromise()
 | 
						|
  const ac = new AbortController()
 | 
						|
  const signal = ac.signal
 | 
						|
  const value = fn(
 | 
						|
    (async function* () {
 | 
						|
      while (true) {
 | 
						|
        const _promise = promise
 | 
						|
        promise = null
 | 
						|
        const { chunk, done, cb } = await _promise
 | 
						|
        process.nextTick(cb)
 | 
						|
        if (done) return
 | 
						|
        if (signal.aborted)
 | 
						|
          throw new AbortError(undefined, {
 | 
						|
            cause: signal.reason
 | 
						|
          })
 | 
						|
          ; ({ promise, resolve } = createDeferredPromise())
 | 
						|
        yield chunk
 | 
						|
      }
 | 
						|
    })(),
 | 
						|
    {
 | 
						|
      signal
 | 
						|
    }
 | 
						|
  )
 | 
						|
  return {
 | 
						|
    value,
 | 
						|
 | 
						|
    write(chunk, encoding, cb) {
 | 
						|
      const _resolve = resolve
 | 
						|
      resolve = null
 | 
						|
 | 
						|
      _resolve({
 | 
						|
        chunk,
 | 
						|
        done: false,
 | 
						|
        cb
 | 
						|
      })
 | 
						|
    },
 | 
						|
 | 
						|
    final(cb) {
 | 
						|
      const _resolve = resolve
 | 
						|
      resolve = null
 | 
						|
 | 
						|
      _resolve({
 | 
						|
        done: true,
 | 
						|
        cb
 | 
						|
      })
 | 
						|
    },
 | 
						|
 | 
						|
    destroy(err, cb) {
 | 
						|
      ac.abort()
 | 
						|
      cb(err)
 | 
						|
    }
 | 
						|
  }
 | 
						|
}
 | 
						|
 | 
						|
function _duplexify(pair) {
 | 
						|
  const r = pair.readable && typeof pair.readable.read !== 'function' ? Readable.wrap(pair.readable) : pair.readable
 | 
						|
  const w = pair.writable
 | 
						|
  let readable = !!isReadable(r)
 | 
						|
  let writable = !!isWritable(w)
 | 
						|
  let ondrain
 | 
						|
  let onfinish
 | 
						|
  let onreadable
 | 
						|
  let onclose
 | 
						|
  let d
 | 
						|
 | 
						|
  function onfinished(err) {
 | 
						|
    const cb = onclose
 | 
						|
    onclose = null
 | 
						|
 | 
						|
    if (cb) {
 | 
						|
      cb(err)
 | 
						|
    } else if (err) {
 | 
						|
      d.destroy(err)
 | 
						|
    } else if (!readable && !writable) {
 | 
						|
      d.destroy()
 | 
						|
    }
 | 
						|
  } // TODO(ronag): Avoid double buffering.
 | 
						|
  // Implement Writable/Readable/Duplex traits.
 | 
						|
  // See, https://github.com/nodejs/node/pull/33515.
 | 
						|
 | 
						|
  d = new Duplexify({
 | 
						|
    // TODO (ronag): highWaterMark?
 | 
						|
    readableObjectMode: !!(r !== null && r !== undefined && r.readableObjectMode),
 | 
						|
    writableObjectMode: !!(w !== null && w !== undefined && w.writableObjectMode),
 | 
						|
    readable,
 | 
						|
    writable
 | 
						|
  })
 | 
						|
 | 
						|
  if (writable) {
 | 
						|
    eos(w, (err) => {
 | 
						|
      writable = false
 | 
						|
 | 
						|
      if (err) {
 | 
						|
        destroyer(r, err)
 | 
						|
      }
 | 
						|
 | 
						|
      onfinished(err)
 | 
						|
    })
 | 
						|
 | 
						|
    d._write = function (chunk, encoding, callback) {
 | 
						|
      if (w.write(chunk, encoding)) {
 | 
						|
        callback()
 | 
						|
      } else {
 | 
						|
        ondrain = callback
 | 
						|
      }
 | 
						|
    }
 | 
						|
 | 
						|
    d._final = function (callback) {
 | 
						|
      w.end()
 | 
						|
      onfinish = callback
 | 
						|
    }
 | 
						|
 | 
						|
    w.on('drain', function () {
 | 
						|
      if (ondrain) {
 | 
						|
        const cb = ondrain
 | 
						|
        ondrain = null
 | 
						|
        cb()
 | 
						|
      }
 | 
						|
    })
 | 
						|
    w.on('finish', function () {
 | 
						|
      if (onfinish) {
 | 
						|
        const cb = onfinish
 | 
						|
        onfinish = null
 | 
						|
        cb()
 | 
						|
      }
 | 
						|
    })
 | 
						|
  }
 | 
						|
 | 
						|
  if (readable) {
 | 
						|
    eos(r, (err) => {
 | 
						|
      readable = false
 | 
						|
 | 
						|
      if (err) {
 | 
						|
        destroyer(r, err)
 | 
						|
      }
 | 
						|
 | 
						|
      onfinished(err)
 | 
						|
    })
 | 
						|
    r.on('readable', function () {
 | 
						|
      if (onreadable) {
 | 
						|
        const cb = onreadable
 | 
						|
        onreadable = null
 | 
						|
        cb()
 | 
						|
      }
 | 
						|
    })
 | 
						|
    r.on('end', function () {
 | 
						|
      d.push(null)
 | 
						|
    })
 | 
						|
 | 
						|
    d._read = function () {
 | 
						|
      while (true) {
 | 
						|
        const buf = r.read()
 | 
						|
 | 
						|
        if (buf === null) {
 | 
						|
          onreadable = d._read
 | 
						|
          return
 | 
						|
        }
 | 
						|
 | 
						|
        if (!d.push(buf)) {
 | 
						|
          return
 | 
						|
        }
 | 
						|
      }
 | 
						|
    }
 | 
						|
  }
 | 
						|
 | 
						|
  d._destroy = function (err, callback) {
 | 
						|
    if (!err && onclose !== null) {
 | 
						|
      err = new AbortError()
 | 
						|
    }
 | 
						|
 | 
						|
    onreadable = null
 | 
						|
    ondrain = null
 | 
						|
    onfinish = null
 | 
						|
 | 
						|
    if (onclose === null) {
 | 
						|
      callback(err)
 | 
						|
    } else {
 | 
						|
      onclose = callback
 | 
						|
      destroyer(w, err)
 | 
						|
      destroyer(r, err)
 | 
						|
    }
 | 
						|
  }
 | 
						|
 | 
						|
  return d
 | 
						|
}
 |