'use strict' const { PromisePrototypeThen, SymbolAsyncIterator, SymbolIterator } = require('../../ours/primordials') const { ERR_INVALID_ARG_TYPE, ERR_STREAM_NULL_VALUES } = require('../../ours/errors').codes function from(Readable, iterable, opts) { let iterator if (typeof iterable === 'string' || iterable instanceof Buffer) { return new Readable({ objectMode: true, ...opts, read() { this.push(iterable) this.push(null) } }) } let isAsync if (iterable && iterable[SymbolAsyncIterator]) { isAsync = true iterator = iterable[SymbolAsyncIterator]() } else if (iterable && iterable[SymbolIterator]) { isAsync = false iterator = iterable[SymbolIterator]() } else { throw new ERR_INVALID_ARG_TYPE('iterable', ['Iterable'], iterable) } const readable = new Readable({ objectMode: true, highWaterMark: 1, // TODO(ronag): What options should be allowed? ...opts }) // Flag to protect against _read // being called before last iteration completion. let reading = false readable._read = function () { if (!reading) { reading = true next() } } readable._destroy = function (error, cb) { PromisePrototypeThen( close(error), () => process.nextTick(cb, error), // nextTick is here in case cb throws (e) => process.nextTick(cb, e || error) ) } async function close(error) { const hadError = error !== undefined && error !== null const hasThrow = typeof iterator.throw === 'function' if (hadError && hasThrow) { const { value, done } = await iterator.throw(error) await value if (done) { return } } if (typeof iterator.return === 'function') { const { value } = await iterator.return() await value } } async function next() { for (;;) { try { const { value, done } = isAsync ? await iterator.next() : iterator.next() if (done) { readable.push(null) } else { const res = value && typeof value.then === 'function' ? await value : value if (res === null) { reading = false throw new ERR_STREAM_NULL_VALUES() } else if (readable.push(res)) { continue } else { reading = false } } } catch (err) { readable.destroy(err) } break } } return readable } module.exports = from