'use strict'

const { PromisePrototypeThen, SymbolAsyncIterator, SymbolIterator } = require('../../ours/primordials')

const { ERR_INVALID_ARG_TYPE, ERR_STREAM_NULL_VALUES } = require('../../ours/errors').codes

function from(Readable, iterable, opts) {
  let iterator

  if (typeof iterable === 'string' || iterable instanceof Buffer) {
    return new Readable({
      objectMode: true,
      ...opts,

      read() {
        this.push(iterable)
        this.push(null)
      }
    })
  }

  let isAsync

  if (iterable && iterable[SymbolAsyncIterator]) {
    isAsync = true
    iterator = iterable[SymbolAsyncIterator]()
  } else if (iterable && iterable[SymbolIterator]) {
    isAsync = false
    iterator = iterable[SymbolIterator]()
  } else {
    throw new ERR_INVALID_ARG_TYPE('iterable', ['Iterable'], iterable)
  }

  const readable = new Readable({
    objectMode: true,
    highWaterMark: 1,
    // TODO(ronag): What options should be allowed?
    ...opts
  }) // Flag to protect against _read
  // being called before last iteration completion.

  let reading = false

  readable._read = function () {
    if (!reading) {
      reading = true
      next()
    }
  }

  readable._destroy = function (error, cb) {
    PromisePrototypeThen(
      close(error),
      () => process.nextTick(cb, error), // nextTick is here in case cb throws
      (e) => process.nextTick(cb, e || error)
    )
  }

  async function close(error) {
    const hadError = error !== undefined && error !== null
    const hasThrow = typeof iterator.throw === 'function'

    if (hadError && hasThrow) {
      const { value, done } = await iterator.throw(error)
      await value

      if (done) {
        return
      }
    }

    if (typeof iterator.return === 'function') {
      const { value } = await iterator.return()
      await value
    }
  }

  async function next() {
    for (;;) {
      try {
        const { value, done } = isAsync ? await iterator.next() : iterator.next()

        if (done) {
          readable.push(null)
        } else {
          const res = value && typeof value.then === 'function' ? await value : value

          if (res === null) {
            reading = false
            throw new ERR_STREAM_NULL_VALUES()
          } else if (readable.push(res)) {
            continue
          } else {
            reading = false
          }
        }
      } catch (err) {
        readable.destroy(err)
      }

      break
    }
  }

  return readable
}

module.exports = from