mirror of
https://github.com/advplyr/audiobookshelf.git
synced 2025-01-22 00:07:52 +01:00
483 lines
16 KiB
JavaScript
483 lines
16 KiB
JavaScript
const Ffmpeg = require('fluent-ffmpeg')
|
|
const EventEmitter = require('events')
|
|
const Path = require('path')
|
|
const fs = require('fs-extra')
|
|
const Logger = require('../Logger')
|
|
const { getId, secondsToTimestamp } = require('../utils/index')
|
|
const { writeConcatFile } = require('../utils/ffmpegHelpers')
|
|
const hlsPlaylistGenerator = require('../utils/hlsPlaylistGenerator')
|
|
|
|
const UserListeningSession = require('./UserListeningSession')
|
|
|
|
class Stream extends EventEmitter {
|
|
constructor(streamPath, client, audiobook, transcodeOptions = {}) {
|
|
super()
|
|
|
|
this.id = getId('str')
|
|
this.client = client
|
|
this.audiobook = audiobook
|
|
|
|
this.transcodeOptions = transcodeOptions
|
|
|
|
this.segmentLength = 6
|
|
this.maxSeekBackTime = 30
|
|
this.streamPath = Path.join(streamPath, this.id)
|
|
this.concatFilesPath = Path.join(this.streamPath, 'files.txt')
|
|
this.playlistPath = Path.join(this.streamPath, 'output.m3u8')
|
|
this.finalPlaylistPath = Path.join(this.streamPath, 'final-output.m3u8')
|
|
this.startTime = 0
|
|
|
|
this.ffmpeg = null
|
|
this.loop = null
|
|
this.isResetting = false
|
|
this.isClientInitialized = false
|
|
this.isTranscodeComplete = false
|
|
this.segmentsCreated = new Set()
|
|
this.furthestSegmentCreated = 0
|
|
this.clientCurrentTime = 0
|
|
|
|
this.listeningSession = new UserListeningSession()
|
|
this.listeningSession.setData(audiobook, client.user)
|
|
|
|
this.init()
|
|
}
|
|
|
|
get socket() {
|
|
return this.client ? this.client.socket || null : null
|
|
}
|
|
|
|
get audiobookId() {
|
|
return this.audiobook.id
|
|
}
|
|
|
|
get audiobookTitle() {
|
|
return this.audiobook ? this.audiobook.title : null
|
|
}
|
|
|
|
get totalDuration() {
|
|
return this.audiobook.duration
|
|
}
|
|
|
|
get tracksAudioFileType() {
|
|
if (!this.tracks.length) return null
|
|
return this.tracks[0].ext.toLowerCase().slice(1)
|
|
}
|
|
|
|
get hlsSegmentType() {
|
|
var hasFlac = this.tracks.find(t => t.ext.toLowerCase() === '.flac')
|
|
return hasFlac ? 'fmp4' : 'mpegts'
|
|
}
|
|
|
|
get segmentBasename() {
|
|
if (this.hlsSegmentType === 'fmp4') return 'output-%d.m4s'
|
|
return 'output-%d.ts'
|
|
}
|
|
|
|
get segmentStartNumber() {
|
|
if (!this.startTime) return 0
|
|
return Math.floor(Math.max(this.startTime - this.maxSeekBackTime, 0) / this.segmentLength)
|
|
}
|
|
|
|
get numSegments() {
|
|
var numSegs = Math.floor(this.totalDuration / this.segmentLength)
|
|
if (this.totalDuration - (numSegs * this.segmentLength) > 0) {
|
|
numSegs++
|
|
}
|
|
return numSegs
|
|
}
|
|
|
|
get tracks() {
|
|
return this.audiobook.tracks
|
|
}
|
|
|
|
get clientUser() {
|
|
return this.client ? this.client.user || {} : null
|
|
}
|
|
|
|
get userToken() {
|
|
return this.clientUser ? this.clientUser.token : null
|
|
}
|
|
|
|
get clientUserAudiobooks() {
|
|
return this.client ? this.clientUser.audiobooks || {} : null
|
|
}
|
|
|
|
get clientUserAudiobookData() {
|
|
return this.client ? this.clientUserAudiobooks[this.audiobookId] : null
|
|
}
|
|
|
|
get clientPlaylistUri() {
|
|
return `/hls/${this.id}/output.m3u8`
|
|
}
|
|
|
|
get clientProgress() {
|
|
if (!this.clientCurrentTime) return 0
|
|
var prog = Math.min(1, this.clientCurrentTime / this.totalDuration)
|
|
return Number(prog.toFixed(3))
|
|
}
|
|
|
|
get isAACEncodable() {
|
|
return ['mp4', 'm4a', 'm4b'].includes(this.tracksAudioFileType)
|
|
}
|
|
|
|
get transcodeForceAAC() {
|
|
return !!this.transcodeOptions.forceAAC
|
|
}
|
|
|
|
toJSON() {
|
|
return {
|
|
id: this.id,
|
|
clientId: this.client.id,
|
|
userId: this.client.user.id,
|
|
audiobook: this.audiobook.toJSONExpanded(),
|
|
segmentLength: this.segmentLength,
|
|
playlistPath: this.playlistPath,
|
|
clientPlaylistUri: this.clientPlaylistUri,
|
|
clientCurrentTime: this.clientCurrentTime,
|
|
startTime: this.startTime,
|
|
segmentStartNumber: this.segmentStartNumber,
|
|
isTranscodeComplete: this.isTranscodeComplete,
|
|
lastUpdate: this.clientUserAudiobookData ? this.clientUserAudiobookData.lastUpdate : 0
|
|
}
|
|
}
|
|
|
|
init() {
|
|
if (this.clientUserAudiobookData) {
|
|
var timeRemaining = this.totalDuration - this.clientUserAudiobookData.currentTime
|
|
Logger.info('[STREAM] User has progress for audiobook', this.clientUserAudiobookData.progress, `Time Remaining: ${timeRemaining}s`)
|
|
if (timeRemaining > 15) {
|
|
this.startTime = this.clientUserAudiobookData.currentTime
|
|
this.clientCurrentTime = this.startTime
|
|
}
|
|
}
|
|
}
|
|
|
|
async checkSegmentNumberRequest(segNum) {
|
|
var segStartTime = segNum * this.segmentLength
|
|
if (this.startTime > segStartTime) {
|
|
Logger.warn(`[STREAM] Segment #${segNum} Request @${secondsToTimestamp(segStartTime)} is before start time (${secondsToTimestamp(this.startTime)}) - Reset Transcode`)
|
|
await this.reset(segStartTime - (this.segmentLength * 2))
|
|
return segStartTime
|
|
} else if (this.isTranscodeComplete) {
|
|
return false
|
|
}
|
|
|
|
var distanceFromFurthestSegment = segNum - this.furthestSegmentCreated
|
|
if (distanceFromFurthestSegment > 10) {
|
|
Logger.info(`Segment #${segNum} requested is ${distanceFromFurthestSegment} segments from latest (${secondsToTimestamp(segStartTime)}) - Reset Transcode`)
|
|
await this.reset(segStartTime - (this.segmentLength * 2))
|
|
return segStartTime
|
|
}
|
|
|
|
return false
|
|
}
|
|
|
|
updateClientCurrentTime(currentTime) {
|
|
Logger.debug('[Stream] Updated client current time', secondsToTimestamp(currentTime))
|
|
this.clientCurrentTime = currentTime
|
|
}
|
|
|
|
syncStream({ timeListened, currentTime }) {
|
|
var syncLog = ''
|
|
// Set user current time
|
|
if (currentTime !== null && !isNaN(currentTime)) {
|
|
syncLog = `Update client current time ${secondsToTimestamp(currentTime)}`
|
|
this.clientCurrentTime = currentTime
|
|
}
|
|
|
|
// Update user listening session
|
|
var saveListeningSession = false
|
|
if (timeListened && !isNaN(timeListened)) {
|
|
|
|
// Check if listening session should roll to next day
|
|
if (this.listeningSession.checkDateRollover()) {
|
|
if (!this.clientUser) {
|
|
Logger.error(`[Stream] Sync stream invalid client user`)
|
|
return null
|
|
}
|
|
this.listeningSession = new UserListeningSession()
|
|
this.listeningSession.setData(this.audiobook, this.clientUser)
|
|
Logger.debug(`[Stream] Listening session rolled to next day`)
|
|
}
|
|
|
|
this.listeningSession.addListeningTime(timeListened)
|
|
if (syncLog) syncLog += ' | '
|
|
syncLog += `Add listening time ${timeListened}s, Total time listened ${this.listeningSession.timeListening}s`
|
|
saveListeningSession = true
|
|
}
|
|
|
|
Logger.debug('[Stream]', syncLog)
|
|
return saveListeningSession ? this.listeningSession : null
|
|
}
|
|
|
|
async generatePlaylist() {
|
|
fs.ensureDirSync(this.streamPath)
|
|
await hlsPlaylistGenerator(this.playlistPath, 'output', this.totalDuration, this.segmentLength, this.hlsSegmentType, this.userToken)
|
|
return this.clientPlaylistUri
|
|
}
|
|
|
|
async checkFiles() {
|
|
try {
|
|
var files = await fs.readdir(this.streamPath)
|
|
files.forEach((file) => {
|
|
var extname = Path.extname(file)
|
|
if (extname === '.ts' || extname === '.m4s') {
|
|
var basename = Path.basename(file, extname)
|
|
var num_part = basename.split('-')[1]
|
|
var part_num = Number(num_part)
|
|
this.segmentsCreated.add(part_num)
|
|
}
|
|
})
|
|
|
|
if (!this.segmentsCreated.size) {
|
|
Logger.warn('No Segments')
|
|
return
|
|
}
|
|
|
|
if (this.segmentsCreated.size > 6 && !this.isClientInitialized) {
|
|
this.isClientInitialized = true
|
|
if (this.socket) {
|
|
Logger.info(`[STREAM] ${this.id} notifying client that stream is ready`)
|
|
this.socket.emit('stream_open', this.toJSON())
|
|
}
|
|
}
|
|
|
|
var chunks = []
|
|
var current_chunk = []
|
|
var last_seg_in_chunk = -1
|
|
|
|
var segments = Array.from(this.segmentsCreated).sort((a, b) => a - b);
|
|
var lastSegment = segments[segments.length - 1]
|
|
if (lastSegment > this.furthestSegmentCreated) {
|
|
this.furthestSegmentCreated = lastSegment
|
|
}
|
|
|
|
segments.forEach((seg) => {
|
|
if (!current_chunk.length || last_seg_in_chunk + 1 === seg) {
|
|
last_seg_in_chunk = seg
|
|
current_chunk.push(seg)
|
|
} else {
|
|
if (current_chunk.length === 1) chunks.push(current_chunk[0])
|
|
else chunks.push(`${current_chunk[0]}-${current_chunk[current_chunk.length - 1]}`)
|
|
last_seg_in_chunk = seg
|
|
current_chunk = [seg]
|
|
}
|
|
})
|
|
if (current_chunk.length) {
|
|
if (current_chunk.length === 1) chunks.push(current_chunk[0])
|
|
else chunks.push(`${current_chunk[0]}-${current_chunk[current_chunk.length - 1]}`)
|
|
}
|
|
|
|
var perc = (this.segmentsCreated.size * 100 / this.numSegments).toFixed(2) + '%'
|
|
Logger.info('[STREAM-CHECK] Check Files', this.segmentsCreated.size, 'of', this.numSegments, perc, `Furthest Segment: ${this.furthestSegmentCreated}`)
|
|
// Logger.debug('[STREAM-CHECK] Chunks', chunks.join(', '))
|
|
|
|
if (this.socket) {
|
|
this.socket.emit('stream_progress', {
|
|
stream: this.id,
|
|
percent: perc,
|
|
chunks,
|
|
numSegments: this.numSegments
|
|
})
|
|
}
|
|
} catch (error) {
|
|
Logger.error('Failed checking files', error)
|
|
}
|
|
}
|
|
|
|
startLoop() {
|
|
// Logger.info(`[Stream] ${this.audiobookTitle} (${this.id}) Start Loop`)
|
|
if (this.socket) {
|
|
this.socket.emit('stream_progress', { stream: this.id, chunks: [], numSegments: 0, percent: '0%' })
|
|
}
|
|
|
|
clearInterval(this.loop)
|
|
var intervalId = setInterval(() => {
|
|
if (!this.isTranscodeComplete) {
|
|
this.checkFiles()
|
|
} else {
|
|
if (this.socket) {
|
|
Logger.info(`[Stream] ${this.audiobookTitle} sending stream_ready`)
|
|
this.socket.emit('stream_ready')
|
|
}
|
|
clearInterval(intervalId)
|
|
}
|
|
}, 2000)
|
|
this.loop = intervalId
|
|
}
|
|
|
|
async start() {
|
|
Logger.info(`[STREAM] START STREAM - Num Segments: ${this.numSegments}`)
|
|
|
|
this.ffmpeg = Ffmpeg()
|
|
|
|
var adjustedStartTime = Math.max(this.startTime - this.maxSeekBackTime, 0)
|
|
var trackStartTime = await writeConcatFile(this.tracks, this.concatFilesPath, adjustedStartTime)
|
|
|
|
this.ffmpeg.addInput(this.concatFilesPath)
|
|
// seek_timestamp : https://ffmpeg.org/ffmpeg.html
|
|
// the argument to the -ss option is considered an actual timestamp, and is not offset by the start time of the file
|
|
// fixes https://github.com/advplyr/audiobookshelf/issues/116
|
|
this.ffmpeg.inputOption('-seek_timestamp 1')
|
|
this.ffmpeg.inputFormat('concat')
|
|
this.ffmpeg.inputOption('-safe 0')
|
|
|
|
if (adjustedStartTime > 0) {
|
|
const shiftedStartTime = adjustedStartTime - trackStartTime
|
|
// Issues using exact fractional seconds i.e. 29.49814 - changing to 29.5s
|
|
var startTimeS = Math.round(shiftedStartTime * 10) / 10 + 's'
|
|
Logger.info(`[STREAM] Starting Stream at startTime ${secondsToTimestamp(adjustedStartTime)} (User startTime ${secondsToTimestamp(this.startTime)}) and Segment #${this.segmentStartNumber}`)
|
|
this.ffmpeg.inputOption(`-ss ${startTimeS}`)
|
|
|
|
this.ffmpeg.inputOption('-noaccurate_seek')
|
|
}
|
|
|
|
const logLevel = process.env.NODE_ENV === 'production' ? 'error' : 'warning'
|
|
const audioCodec = (this.hlsSegmentType === 'fmp4' || this.tracksAudioFileType === 'opus' || this.transcodeForceAAC) ? 'aac' : 'copy'
|
|
this.ffmpeg.addOption([
|
|
`-loglevel ${logLevel}`,
|
|
'-map 0:a',
|
|
`-c:a ${audioCodec}`
|
|
])
|
|
const hlsOptions = [
|
|
'-f hls',
|
|
"-copyts",
|
|
"-avoid_negative_ts make_non_negative",
|
|
"-max_delay 5000000",
|
|
"-max_muxing_queue_size 2048",
|
|
`-hls_time 6`,
|
|
`-hls_segment_type ${this.hlsSegmentType}`,
|
|
`-start_number ${this.segmentStartNumber}`,
|
|
"-hls_playlist_type vod",
|
|
"-hls_list_size 0",
|
|
"-hls_allow_cache 0"
|
|
]
|
|
if (this.hlsSegmentType === 'fmp4') {
|
|
hlsOptions.push('-strict -2')
|
|
// var fmp4InitFilename = Path.join(this.streamPath, 'init.mp4')
|
|
var fmp4InitFilename = 'init.mp4'
|
|
hlsOptions.push(`-hls_fmp4_init_filename ${fmp4InitFilename}`)
|
|
}
|
|
this.ffmpeg.addOption(hlsOptions)
|
|
var segmentFilename = Path.join(this.streamPath, this.segmentBasename)
|
|
this.ffmpeg.addOption(`-hls_segment_filename ${segmentFilename}`)
|
|
this.ffmpeg.output(this.finalPlaylistPath)
|
|
|
|
this.ffmpeg.on('start', (command) => {
|
|
Logger.info('[INFO] FFMPEG transcoding started with command: ' + command)
|
|
Logger.info('')
|
|
if (this.isResetting) {
|
|
// AAC encode is much slower
|
|
const clearIsResettingTime = this.transcodeForceAAC ? 3000 : 500
|
|
setTimeout(() => {
|
|
Logger.info('[STREAM] Clearing isResetting')
|
|
this.isResetting = false
|
|
this.startLoop()
|
|
}, clearIsResettingTime)
|
|
} else {
|
|
this.startLoop()
|
|
}
|
|
})
|
|
|
|
this.ffmpeg.on('stderr', (stdErrline) => {
|
|
Logger.info(stdErrline)
|
|
})
|
|
|
|
this.ffmpeg.on('error', (err, stdout, stderr) => {
|
|
if (err.message && err.message.includes('SIGKILL')) {
|
|
// This is an intentional SIGKILL
|
|
Logger.info('[FFMPEG] Transcode Killed')
|
|
this.ffmpeg = null
|
|
clearInterval(this.loop)
|
|
} else {
|
|
Logger.error('Ffmpeg Err', '"' + err.message + '"')
|
|
|
|
// Temporary workaround for https://github.com/advplyr/audiobookshelf/issues/172
|
|
const aacErrorMsg = 'ffmpeg exited with code 1: Could not write header for output file #0 (incorrect codec parameters ?)'
|
|
if (audioCodec === 'copy' && this.isAACEncodable && err.message && err.message.startsWith(aacErrorMsg)) {
|
|
Logger.info(`[Stream] Re-attempting stream with AAC encode`)
|
|
this.transcodeOptions.forceAAC = true
|
|
this.reset(this.startTime)
|
|
} else {
|
|
// Close stream show error
|
|
this.close(err.message)
|
|
}
|
|
}
|
|
})
|
|
|
|
this.ffmpeg.on('end', (stdout, stderr) => {
|
|
Logger.info('[FFMPEG] Transcoding ended')
|
|
// For very small fast load
|
|
if (!this.isClientInitialized) {
|
|
this.isClientInitialized = true
|
|
if (this.socket) {
|
|
Logger.info(`[STREAM] ${this.id} notifying client that stream is ready`)
|
|
this.socket.emit('stream_open', this.toJSON())
|
|
}
|
|
}
|
|
this.isTranscodeComplete = true
|
|
this.ffmpeg = null
|
|
clearInterval(this.loop)
|
|
})
|
|
|
|
this.ffmpeg.run()
|
|
}
|
|
|
|
async close(errorMessage = null) {
|
|
clearInterval(this.loop)
|
|
|
|
Logger.info('Closing Stream', this.id)
|
|
if (this.ffmpeg) {
|
|
this.ffmpeg.kill('SIGKILL')
|
|
}
|
|
|
|
await fs.remove(this.streamPath).then(() => {
|
|
Logger.info('Deleted session data', this.streamPath)
|
|
}).catch((err) => {
|
|
Logger.error('Failed to delete session data', err)
|
|
})
|
|
|
|
if (this.socket) {
|
|
if (errorMessage) this.socket.emit('stream_error', { id: this.id, error: (errorMessage || '').trim() })
|
|
else this.socket.emit('stream_closed', this.id)
|
|
}
|
|
|
|
this.emit('closed')
|
|
}
|
|
|
|
cancelTranscode() {
|
|
clearInterval(this.loop)
|
|
if (this.ffmpeg) {
|
|
this.ffmpeg.kill('SIGKILL')
|
|
}
|
|
}
|
|
|
|
async waitCancelTranscode() {
|
|
for (let i = 0; i < 20; i++) {
|
|
if (!this.ffmpeg) return true
|
|
await new Promise((resolve) => setTimeout(resolve, 500))
|
|
}
|
|
Logger.error('[STREAM] Transcode never closed...')
|
|
return false
|
|
}
|
|
|
|
async reset(time) {
|
|
if (this.isResetting) {
|
|
return Logger.info(`[STREAM] Stream ${this.id} already resetting`)
|
|
}
|
|
time = Math.max(0, time)
|
|
this.isResetting = true
|
|
|
|
if (this.ffmpeg) {
|
|
this.cancelTranscode()
|
|
await this.waitCancelTranscode()
|
|
}
|
|
|
|
this.isTranscodeComplete = false
|
|
this.startTime = time
|
|
this.clientCurrentTime = this.startTime
|
|
Logger.info(`Stream Reset New Start Time ${secondsToTimestamp(this.startTime)}`)
|
|
this.start()
|
|
}
|
|
}
|
|
module.exports = Stream |