const Path = require('path') const fs = require('../libs/fsExtra') const archiver = require('../libs/archiver') const workerThreads = require('worker_threads') const Logger = require('../Logger') const Download = require('../objects/Download') const filePerms = require('../utils/filePerms') const { getId } = require('../utils/index') const { writeConcatFile, writeMetadataFile } = require('../utils/ffmpegHelpers') const { getFileSize } = require('../utils/fileUtils') const TAG = 'DownloadManager' class DownloadManager { constructor(db) { this.db = db this.downloadDirPath = Path.join(global.MetadataPath, 'downloads') this.pendingDownloads = [] this.downloads = [] } getDownload(downloadId) { return this.downloads.find(d => d.id === downloadId) } downloadSocketRequest(socket, payload) { var client = socket.sheepClient var audiobook = this.db.audiobooks.find(a => a.id === payload.audiobookId) var options = { ...payload } delete options.audiobookId this.prepareDownload(client, audiobook, options) } removeSocketRequest(socket, downloadId) { var download = this.downloads.find(d => d.id === downloadId) if (!download) { Logger.error('Remove download request download not found ' + downloadId) return } this.removeDownload(download) } async prepareDownload(client, audiobook, options = {}) { var downloadId = getId('dl') var dlpath = Path.join(this.downloadDirPath, downloadId) Logger.info(`Start Download for ${audiobook.id} - DownloadId: ${downloadId} - ${dlpath}`) await fs.ensureDir(dlpath) var downloadType = options.type || 'singleAudio' delete options.type var fileext = null var audiobookDirname = Path.basename(audiobook.path) if (downloadType === 'singleAudio') { var audioFileType = options.audioFileType || '.m4b' delete options.audioFileType if (audioFileType === 'same') { var firstTrack = audiobook.tracks[0] audioFileType = firstTrack.ext } fileext = audioFileType } else if (downloadType === 'zip') { fileext = '.zip' } var filename = audiobookDirname + fileext var downloadData = { id: downloadId, audiobookId: audiobook.id, type: downloadType, options: options, dirpath: dlpath, fullPath: Path.join(dlpath, filename), filename, ext: fileext, userId: (client && client.user) ? client.user.id : null, socket: (client && client.socket) ? client.socket : null } var download = new Download() download.setData(downloadData) download.setTimeoutTimer(this.downloadTimedOut.bind(this)) if (downloadData.socket) { downloadData.socket.emit('download_started', download.toJSON()) } if (download.type === 'singleAudio') { this.processSingleAudioDownload(audiobook, download) } else if (download.type === 'zip') { this.processZipDownload(audiobook, download) } } async processZipDownload(audiobook, download) { this.pendingDownloads.push({ id: download.id, download }) Logger.info(`[DownloadManager] Processing Zip download ${download.fullPath}`) var success = await this.zipAudiobookDir(audiobook.fullPath, download.fullPath).then(() => { return true }).catch((error) => { Logger.error('[DownloadManager] Process Zip Failed', error) return false }) this.sendResult(download, { success }) } zipAudiobookDir(audiobookPath, downloadPath) { return new Promise((resolve, reject) => { // create a file to stream archive data to const output = fs.createWriteStream(downloadPath) const archive = archiver('zip', { zlib: { level: 9 } // Sets the compression level. }) // listen for all archive data to be written // 'close' event is fired only when a file descriptor is involved output.on('close', () => { Logger.info(archive.pointer() + ' total bytes') Logger.debug('archiver has been finalized and the output file descriptor has closed.') resolve() }) // This event is fired when the data source is drained no matter what was the data source. // It is not part of this library but rather from the NodeJS Stream API. // @see: https://nodejs.org/api/stream.html#stream_event_end output.on('end', () => { Logger.debug('Data has been drained') }) // good practice to catch warnings (ie stat failures and other non-blocking errors) archive.on('warning', function (err) { if (err.code === 'ENOENT') { // log warning Logger.warn(`[DownloadManager] Archiver warning: ${err.message}`) } else { // throw error Logger.error(`[DownloadManager] Archiver error: ${err.message}`) // throw err reject(err) } }) archive.on('error', function (err) { Logger.error(`[DownloadManager] Archiver error: ${err.message}`) reject(err) }) // pipe archive data to the file archive.pipe(output) archive.directory(audiobookPath, false) archive.finalize() }) } async processSingleAudioDownload(audiobook, download) { // If changing audio file type then encoding is needed var audioRequiresEncode = audiobook.tracks[0].ext !== download.ext var shouldIncludeCover = download.includeCover && audiobook.book.cover var firstTrackIsM4b = audiobook.tracks[0].ext.toLowerCase() === '.m4b' var isOneTrack = audiobook.tracks.length === 1 const ffmpegInputs = [] if (!isOneTrack) { var concatFilePath = Path.join(download.dirpath, 'files.txt') await writeConcatFile(audiobook.tracks, concatFilePath) ffmpegInputs.push({ input: concatFilePath, options: ['-safe 0', '-f concat'] }) } else { ffmpegInputs.push({ input: audiobook.tracks[0].fullPath, options: firstTrackIsM4b ? ['-f mp4'] : [] }) } const logLevel = process.env.NODE_ENV === 'production' ? 'error' : 'warning' var ffmpegOptions = [`-loglevel ${logLevel}`] var ffmpegOutputOptions = [] if (audioRequiresEncode) { ffmpegOptions = ffmpegOptions.concat([ '-map 0:a', '-acodec aac', '-ac 2', '-b:a 64k', '-id3v2_version 3' ]) } else { ffmpegOptions.push('-max_muxing_queue_size 1000') if (isOneTrack && firstTrackIsM4b && !shouldIncludeCover) { ffmpegOptions.push('-c copy') } else { ffmpegOptions.push('-c:a copy') } } if (download.ext === '.m4b') { Logger.info('Concat m4b\'s use -f mp4') ffmpegOutputOptions.push('-f mp4') } if (download.includeMetadata) { var metadataFilePath = Path.join(download.dirpath, 'metadata.txt') await writeMetadataFile(audiobook, metadataFilePath) ffmpegInputs.push({ input: metadataFilePath }) ffmpegOptions.push('-map_metadata 1') } if (shouldIncludeCover) { var _cover = audiobook.book.coverFullPath.replace(/\\/g, '/') ffmpegInputs.push({ input: _cover, options: ['-f image2pipe'] }) ffmpegOptions.push('-vf [2:v]crop=trunc(iw/2)*2:trunc(ih/2)*2') ffmpegOptions.push('-map 2:v') } var workerData = { inputs: ffmpegInputs, options: ffmpegOptions, outputOptions: ffmpegOutputOptions, output: download.fullPath, } var worker = null try { var workerPath = Path.join(global.appRoot, 'server/utils/downloadWorker.js') worker = new workerThreads.Worker(workerPath, { workerData }) } catch (error) { Logger.error(`[${TAG}] Start worker thread failed`, error) if (download.socket) { var downloadJson = download.toJSON() download.socket.emit('download_failed', downloadJson) } this.removeDownload(download) return } worker.on('message', (message) => { if (message != null && typeof message === 'object') { if (message.type === 'RESULT') { if (!download.isTimedOut) { this.sendResult(download, message) } } else if (message.type === 'FFMPEG') { if (Logger[message.level]) { Logger[message.level](message.log) } } } else { Logger.error('Invalid worker message', message) } }) this.pendingDownloads.push({ id: download.id, download, worker }) } async downloadTimedOut(download) { Logger.info(`[DownloadManager] Download ${download.id} timed out (${download.timeoutTimeMs}ms)`) if (download.socket) { var downloadJson = download.toJSON() downloadJson.isTimedOut = true download.socket.emit('download_failed', downloadJson) } this.removeDownload(download) } async downloadExpired(download) { Logger.info(`[DownloadManager] Download ${download.id} expired`) if (download.socket) { download.socket.emit('download_expired', download.toJSON()) } this.removeDownload(download) } async sendResult(download, result) { download.clearTimeoutTimer() // Remove pending download this.pendingDownloads = this.pendingDownloads.filter(d => d.id !== download.id) if (result.isKilled) { if (download.socket) { download.socket.emit('download_killed', download.toJSON()) } return } if (!result.success) { if (download.socket) { download.socket.emit('download_failed', download.toJSON()) } this.removeDownload(download) return } // Set file permissions and ownership await filePerms.setDefault(download.fullPath) var filesize = await getFileSize(download.fullPath) download.setComplete(filesize) if (download.socket) { download.socket.emit('download_ready', download.toJSON()) } download.setExpirationTimer(this.downloadExpired.bind(this)) this.downloads.push(download) Logger.info(`[DownloadManager] Download Ready ${download.id}`) } async removeDownload(download) { Logger.info('[DownloadManager] Removing download ' + download.id) download.clearTimeoutTimer() download.clearExpirationTimer() var pendingDl = this.pendingDownloads.find(d => d.id === download.id) if (pendingDl) { this.pendingDownloads = this.pendingDownloads.filter(d => d.id !== download.id) Logger.warn(`[DownloadManager] Removing download in progress - stopping worker`) if (pendingDl.worker) { try { pendingDl.worker.postMessage('STOP') } catch (error) { Logger.error('[DownloadManager] Error posting stop message to worker', error) } } } await fs.remove(download.dirpath).then(() => { Logger.info('[DownloadManager] Deleted download', download.dirpath) }).catch((err) => { Logger.error('[DownloadManager] Failed to delete download', err) }) this.downloads = this.downloads.filter(d => d.id !== download.id) } } module.exports = DownloadManager