Fix:Watcher waits for files to finish transferring before scanning #1362 #2248

This commit is contained in:
advplyr 2023-10-25 16:53:53 -05:00
parent ef1cdf6ad2
commit 8dc4490169
2 changed files with 97 additions and 30 deletions

View File

@ -6,7 +6,7 @@ const LibraryScanner = require('./scanner/LibraryScanner')
const Task = require('./objects/Task') const Task = require('./objects/Task')
const TaskManager = require('./managers/TaskManager') const TaskManager = require('./managers/TaskManager')
const { filePathToPOSIX, isSameOrSubPath } = require('./utils/fileUtils') const { filePathToPOSIX, isSameOrSubPath, getFileMTimeMs } = require('./utils/fileUtils')
/** /**
* @typedef PendingFileUpdate * @typedef PendingFileUpdate
@ -29,6 +29,8 @@ class FolderWatcher extends EventEmitter {
/** @type {Task} */ /** @type {Task} */
this.pendingTask = null this.pendingTask = null
this.filesBeingAdded = new Set()
/** @type {string[]} */ /** @type {string[]} */
this.ignoreDirs = [] this.ignoreDirs = []
/** @type {string[]} */ /** @type {string[]} */
@ -64,14 +66,13 @@ class FolderWatcher extends EventEmitter {
}) })
watcher watcher
.on('add', (path) => { .on('add', (path) => {
this.onNewFile(library.id, path) this.onFileAdded(library.id, filePathToPOSIX(path))
}).on('change', (path) => { }).on('change', (path) => {
// This is triggered from metadata changes, not what we want // This is triggered from metadata changes, not what we want
// this.onFileUpdated(path)
}).on('unlink', path => { }).on('unlink', path => {
this.onFileRemoved(library.id, path) this.onFileRemoved(library.id, filePathToPOSIX(path))
}).on('rename', (path, pathNext) => { }).on('rename', (path, pathNext) => {
this.onRename(library.id, path, pathNext) this.onFileRename(library.id, filePathToPOSIX(path), filePathToPOSIX(pathNext))
}).on('error', (error) => { }).on('error', (error) => {
Logger.error(`[Watcher] ${error}`) Logger.error(`[Watcher] ${error}`)
}).on('ready', () => { }).on('ready', () => {
@ -137,14 +138,31 @@ class FolderWatcher extends EventEmitter {
return this.libraryWatchers.map(lib => lib.watcher.close()) return this.libraryWatchers.map(lib => lib.watcher.close())
} }
onNewFile(libraryId, path) { /**
* Watcher detected file added
*
* @param {string} libraryId
* @param {string} path
*/
onFileAdded(libraryId, path) {
if (this.checkShouldIgnorePath(path)) { if (this.checkShouldIgnorePath(path)) {
return return
} }
Logger.debug('[Watcher] File Added', path) Logger.debug('[Watcher] File Added', path)
this.addFileUpdate(libraryId, path, 'added') this.addFileUpdate(libraryId, path, 'added')
if (!this.filesBeingAdded.has(path)) {
this.filesBeingAdded.add(path)
this.waitForFileToAdd(path)
}
} }
/**
* Watcher detected file removed
*
* @param {string} libraryId
* @param {string} path
*/
onFileRemoved(libraryId, path) { onFileRemoved(libraryId, path) {
if (this.checkShouldIgnorePath(path)) { if (this.checkShouldIgnorePath(path)) {
return return
@ -153,11 +171,13 @@ class FolderWatcher extends EventEmitter {
this.addFileUpdate(libraryId, path, 'deleted') this.addFileUpdate(libraryId, path, 'deleted')
} }
onFileUpdated(path) { /**
Logger.debug('[Watcher] Updated File', path) * Watcher detected file renamed
} *
* @param {string} libraryId
onRename(libraryId, pathFrom, pathTo) { * @param {string} path
*/
onFileRename(libraryId, pathFrom, pathTo) {
if (this.checkShouldIgnorePath(pathTo)) { if (this.checkShouldIgnorePath(pathTo)) {
return return
} }
@ -166,13 +186,41 @@ class FolderWatcher extends EventEmitter {
} }
/** /**
* File update detected from watcher * Get mtimeMs from an added file every second until it is no longer changing
* Times out after 180s
*
* @param {string} path
* @param {number} [lastMTimeMs=0]
* @param {number} [loop=0]
*/
async waitForFileToAdd(path, lastMTimeMs = 0, loop = 0) {
// Safety to catch infinite loop (180s)
if (loop >= 180) {
Logger.warn(`[Watcher] Waiting to add file at "${path}" timeout (loop ${loop}) - proceeding`)
return this.filesBeingAdded.delete(path)
}
const mtimeMs = await getFileMTimeMs(path)
if (mtimeMs === lastMTimeMs) {
if (lastMTimeMs) Logger.debug(`[Watcher] File finished adding at "${path}"`)
return this.filesBeingAdded.delete(path)
}
if (lastMTimeMs % 5 === 0) {
Logger.debug(`[Watcher] Waiting to add file at "${path}". mtimeMs=${mtimeMs} lastMTimeMs=${lastMTimeMs} (loop ${loop})`)
}
// Wait 1 second
await new Promise((resolve) => setTimeout(resolve, 1000))
this.waitForFileToAdd(path, mtimeMs, ++loop)
}
/**
* Queue file update
*
* @param {string} libraryId * @param {string} libraryId
* @param {string} path * @param {string} path
* @param {string} type * @param {string} type
*/ */
addFileUpdate(libraryId, path, type) { addFileUpdate(libraryId, path, type) {
path = filePathToPOSIX(path)
if (this.pendingFilePaths.includes(path)) return if (this.pendingFilePaths.includes(path)) return
// Get file library // Get file library
@ -222,12 +270,26 @@ class FolderWatcher extends EventEmitter {
type type
}) })
// Notify server of update after "pendingDelay" this.handlePendingFileUpdatesTimeout()
}
/**
* Wait X seconds before notifying scanner that files changed
* reset timer if files are still copying
*/
handlePendingFileUpdatesTimeout() {
clearTimeout(this.pendingTimeout) clearTimeout(this.pendingTimeout)
this.pendingTimeout = setTimeout(() => { this.pendingTimeout = setTimeout(() => {
// Check that files are not still being added
if (this.pendingFileUpdates.some(pfu => this.filesBeingAdded.has(pfu.path))) {
Logger.debug(`[Watcher] Still waiting for pending files "${[...this.filesBeingAdded].join(', ')}"`)
return this.handlePendingFileUpdatesTimeout()
}
LibraryScanner.scanFilesChanged(this.pendingFileUpdates, this.pendingTask) LibraryScanner.scanFilesChanged(this.pendingFileUpdates, this.pendingTask)
this.pendingTask = null this.pendingTask = null
this.pendingFileUpdates = [] this.pendingFileUpdates = []
this.filesBeingAdded.clear()
}, this.pendingDelay) }, this.pendingDelay)
} }

View File

@ -38,22 +38,14 @@ function isSameOrSubPath(parentPath, childPath) {
} }
module.exports.isSameOrSubPath = isSameOrSubPath module.exports.isSameOrSubPath = isSameOrSubPath
async function getFileStat(path) { function getFileStat(path) {
try { try {
var stat = await fs.stat(path) return fs.stat(path)
return {
size: stat.size,
atime: stat.atime,
mtime: stat.mtime,
ctime: stat.ctime,
birthtime: stat.birthtime
}
} catch (err) { } catch (err) {
Logger.error('[fileUtils] Failed to stat', err) Logger.error('[fileUtils] Failed to stat', err)
return false return null
} }
} }
module.exports.getFileStat = getFileStat
async function getFileTimestampsWithIno(path) { async function getFileTimestampsWithIno(path) {
try { try {
@ -72,12 +64,25 @@ async function getFileTimestampsWithIno(path) {
} }
module.exports.getFileTimestampsWithIno = getFileTimestampsWithIno module.exports.getFileTimestampsWithIno = getFileTimestampsWithIno
async function getFileSize(path) { /**
var stat = await getFileStat(path) * Get file size
if (!stat) return 0 *
return stat.size || 0 * @param {string} path
* @returns {Promise<number>}
*/
module.exports.getFileSize = async (path) => {
return (await getFileStat(path))?.size || 0
}
/**
* Get file mtimeMs
*
* @param {string} path
* @returns {Promise<number>} epoch timestamp
*/
module.exports.getFileMTimeMs = async (path) => {
return (await getFileStat(path))?.mtimeMs || 0
} }
module.exports.getFileSize = getFileSize
/** /**
* *