mirror of
https://github.com/advplyr/audiobookshelf.git
synced 2025-04-16 01:16:24 +02:00
Update package.json pkg assets, remove njodb and dependencies
This commit is contained in:
parent
b9be7510f8
commit
e85ddc1aa1
@ -20,7 +20,7 @@
|
||||
"pkg": {
|
||||
"assets": [
|
||||
"client/dist/**/*",
|
||||
"server/Db.js"
|
||||
"node_modules/sqlite3/lib/binding/**/*.node"
|
||||
],
|
||||
"scripts": [
|
||||
"prod.js",
|
||||
@ -44,4 +44,4 @@
|
||||
"devDependencies": {
|
||||
"nodemon": "^2.0.20"
|
||||
}
|
||||
}
|
||||
}
|
@ -14,7 +14,7 @@ class Database {
|
||||
this.isNew = false // New absdatabase.sqlite created
|
||||
|
||||
// Temporarily using format of old DB
|
||||
// below data should be loaded from the DB as needed
|
||||
// TODO: below data should be loaded from the DB as needed
|
||||
this.libraryItems = []
|
||||
this.users = []
|
||||
this.libraries = []
|
||||
|
454
server/Db.js
454
server/Db.js
@ -1,454 +0,0 @@
|
||||
const Path = require('path')
|
||||
const njodb = require('./libs/njodb')
|
||||
const Logger = require('./Logger')
|
||||
const { version } = require('../package.json')
|
||||
const filePerms = require('./utils/filePerms')
|
||||
const LibraryItem = require('./objects/LibraryItem')
|
||||
const User = require('./objects/user/User')
|
||||
const Collection = require('./objects/Collection')
|
||||
const Playlist = require('./objects/Playlist')
|
||||
const Library = require('./objects/Library')
|
||||
const Author = require('./objects/entities/Author')
|
||||
const Series = require('./objects/entities/Series')
|
||||
const ServerSettings = require('./objects/settings/ServerSettings')
|
||||
const NotificationSettings = require('./objects/settings/NotificationSettings')
|
||||
const EmailSettings = require('./objects/settings/EmailSettings')
|
||||
const PlaybackSession = require('./objects/PlaybackSession')
|
||||
|
||||
class Db {
|
||||
constructor() {
|
||||
this.LibraryItemsPath = Path.join(global.ConfigPath, 'libraryItems')
|
||||
this.UsersPath = Path.join(global.ConfigPath, 'users')
|
||||
this.SessionsPath = Path.join(global.ConfigPath, 'sessions')
|
||||
this.LibrariesPath = Path.join(global.ConfigPath, 'libraries')
|
||||
this.SettingsPath = Path.join(global.ConfigPath, 'settings')
|
||||
this.CollectionsPath = Path.join(global.ConfigPath, 'collections')
|
||||
this.PlaylistsPath = Path.join(global.ConfigPath, 'playlists')
|
||||
this.AuthorsPath = Path.join(global.ConfigPath, 'authors')
|
||||
this.SeriesPath = Path.join(global.ConfigPath, 'series')
|
||||
this.FeedsPath = Path.join(global.ConfigPath, 'feeds')
|
||||
|
||||
this.libraryItemsDb = new njodb.Database(this.LibraryItemsPath, this.getNjodbOptions())
|
||||
this.usersDb = new njodb.Database(this.UsersPath, this.getNjodbOptions())
|
||||
this.sessionsDb = new njodb.Database(this.SessionsPath, this.getNjodbOptions())
|
||||
this.librariesDb = new njodb.Database(this.LibrariesPath, this.getNjodbOptions())
|
||||
this.settingsDb = new njodb.Database(this.SettingsPath, this.getNjodbOptions())
|
||||
this.collectionsDb = new njodb.Database(this.CollectionsPath, this.getNjodbOptions())
|
||||
this.playlistsDb = new njodb.Database(this.PlaylistsPath, this.getNjodbOptions())
|
||||
this.authorsDb = new njodb.Database(this.AuthorsPath, this.getNjodbOptions())
|
||||
this.seriesDb = new njodb.Database(this.SeriesPath, this.getNjodbOptions())
|
||||
this.feedsDb = new njodb.Database(this.FeedsPath, this.getNjodbOptions())
|
||||
|
||||
this.libraryItems = []
|
||||
this.users = []
|
||||
this.libraries = []
|
||||
this.settings = []
|
||||
this.collections = []
|
||||
this.playlists = []
|
||||
this.authors = []
|
||||
this.series = []
|
||||
|
||||
this.serverSettings = null
|
||||
this.notificationSettings = null
|
||||
this.emailSettings = null
|
||||
|
||||
// Stores previous version only if upgraded
|
||||
this.previousVersion = null
|
||||
}
|
||||
|
||||
get hasRootUser() {
|
||||
return this.users.some(u => u.id === 'root')
|
||||
}
|
||||
|
||||
getNjodbOptions() {
|
||||
return {
|
||||
lockoptions: {
|
||||
stale: 1000 * 20, // 20 seconds
|
||||
update: 2500,
|
||||
retries: {
|
||||
retries: 20,
|
||||
minTimeout: 250,
|
||||
maxTimeout: 5000,
|
||||
factor: 1
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
getEntityDb(entityName) {
|
||||
if (entityName === 'user') return this.usersDb
|
||||
else if (entityName === 'session') return this.sessionsDb
|
||||
else if (entityName === 'libraryItem') return this.libraryItemsDb
|
||||
else if (entityName === 'library') return this.librariesDb
|
||||
else if (entityName === 'settings') return this.settingsDb
|
||||
else if (entityName === 'collection') return this.collectionsDb
|
||||
else if (entityName === 'playlist') return this.playlistsDb
|
||||
else if (entityName === 'author') return this.authorsDb
|
||||
else if (entityName === 'series') return this.seriesDb
|
||||
else if (entityName === 'feed') return this.feedsDb
|
||||
return null
|
||||
}
|
||||
|
||||
getEntityArrayKey(entityName) {
|
||||
if (entityName === 'user') return 'users'
|
||||
else if (entityName === 'session') return 'sessions'
|
||||
else if (entityName === 'libraryItem') return 'libraryItems'
|
||||
else if (entityName === 'library') return 'libraries'
|
||||
else if (entityName === 'settings') return 'settings'
|
||||
else if (entityName === 'collection') return 'collections'
|
||||
else if (entityName === 'playlist') return 'playlists'
|
||||
else if (entityName === 'author') return 'authors'
|
||||
else if (entityName === 'series') return 'series'
|
||||
else if (entityName === 'feed') return 'feeds'
|
||||
return null
|
||||
}
|
||||
|
||||
reinit() {
|
||||
this.libraryItemsDb = new njodb.Database(this.LibraryItemsPath, this.getNjodbOptions())
|
||||
this.usersDb = new njodb.Database(this.UsersPath, this.getNjodbOptions())
|
||||
this.sessionsDb = new njodb.Database(this.SessionsPath, this.getNjodbOptions())
|
||||
this.librariesDb = new njodb.Database(this.LibrariesPath, this.getNjodbOptions())
|
||||
this.settingsDb = new njodb.Database(this.SettingsPath, this.getNjodbOptions())
|
||||
this.collectionsDb = new njodb.Database(this.CollectionsPath, this.getNjodbOptions())
|
||||
this.playlistsDb = new njodb.Database(this.PlaylistsPath, this.getNjodbOptions())
|
||||
this.authorsDb = new njodb.Database(this.AuthorsPath, this.getNjodbOptions())
|
||||
this.seriesDb = new njodb.Database(this.SeriesPath, this.getNjodbOptions())
|
||||
this.feedsDb = new njodb.Database(this.FeedsPath, this.getNjodbOptions())
|
||||
return this.init()
|
||||
}
|
||||
|
||||
// Get previous server version before loading DB to check whether a db migration is required
|
||||
// returns null if server was not upgraded
|
||||
checkPreviousVersion() {
|
||||
return this.settingsDb.select(() => true).then((results) => {
|
||||
if (results.data && results.data.length) {
|
||||
const serverSettings = results.data.find(s => s.id === 'server-settings')
|
||||
if (serverSettings && serverSettings.version && serverSettings.version !== version) {
|
||||
return serverSettings.version
|
||||
}
|
||||
}
|
||||
return null
|
||||
})
|
||||
}
|
||||
|
||||
createRootUser(username, pash, token) {
|
||||
const newRoot = new User({
|
||||
id: 'root',
|
||||
type: 'root',
|
||||
username,
|
||||
pash,
|
||||
token,
|
||||
isActive: true,
|
||||
createdAt: Date.now()
|
||||
})
|
||||
return this.insertEntity('user', newRoot)
|
||||
}
|
||||
|
||||
async init() {
|
||||
await this.load()
|
||||
|
||||
// Set file ownership for all files created by db
|
||||
await filePerms.setDefault(global.ConfigPath, true)
|
||||
|
||||
if (!this.serverSettings) { // Create first load server settings
|
||||
this.serverSettings = new ServerSettings()
|
||||
await this.insertEntity('settings', this.serverSettings)
|
||||
}
|
||||
if (!this.notificationSettings) {
|
||||
this.notificationSettings = new NotificationSettings()
|
||||
await this.insertEntity('settings', this.notificationSettings)
|
||||
}
|
||||
if (!this.emailSettings) {
|
||||
this.emailSettings = new EmailSettings()
|
||||
await this.insertEntity('settings', this.emailSettings)
|
||||
}
|
||||
global.ServerSettings = this.serverSettings.toJSON()
|
||||
}
|
||||
|
||||
async load() {
|
||||
const p1 = this.libraryItemsDb.select(() => true).then((results) => {
|
||||
this.libraryItems = results.data.map(a => new LibraryItem(a))
|
||||
Logger.info(`[DB] ${this.libraryItems.length} Library Items Loaded`)
|
||||
})
|
||||
const p2 = this.usersDb.select(() => true).then((results) => {
|
||||
this.users = results.data.map(u => new User(u))
|
||||
Logger.info(`[DB] ${this.users.length} Users Loaded`)
|
||||
})
|
||||
const p3 = this.librariesDb.select(() => true).then((results) => {
|
||||
this.libraries = results.data.map(l => new Library(l))
|
||||
this.libraries.sort((a, b) => a.displayOrder - b.displayOrder)
|
||||
Logger.info(`[DB] ${this.libraries.length} Libraries Loaded`)
|
||||
})
|
||||
const p4 = this.settingsDb.select(() => true).then(async (results) => {
|
||||
if (results.data && results.data.length) {
|
||||
this.settings = results.data
|
||||
const serverSettings = this.settings.find(s => s.id === 'server-settings')
|
||||
if (serverSettings) {
|
||||
this.serverSettings = new ServerSettings(serverSettings)
|
||||
|
||||
// Check if server was upgraded
|
||||
if (!this.serverSettings.version || this.serverSettings.version !== version) {
|
||||
this.previousVersion = this.serverSettings.version || '1.0.0'
|
||||
|
||||
// Library settings and server settings updated in 2.1.3 - run migration
|
||||
if (this.previousVersion.localeCompare('2.1.3') < 0) {
|
||||
Logger.info(`[Db] Running servers & library settings migration`)
|
||||
for (const library of this.libraries) {
|
||||
if (library.settings.coverAspectRatio !== serverSettings.coverAspectRatio) {
|
||||
library.settings.coverAspectRatio = serverSettings.coverAspectRatio
|
||||
await this.updateEntity('library', library)
|
||||
Logger.debug(`[Db] Library ${library.name} migrated`)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
const notificationSettings = this.settings.find(s => s.id === 'notification-settings')
|
||||
if (notificationSettings) {
|
||||
this.notificationSettings = new NotificationSettings(notificationSettings)
|
||||
}
|
||||
|
||||
const emailSettings = this.settings.find(s => s.id === 'email-settings')
|
||||
if (emailSettings) {
|
||||
this.emailSettings = new EmailSettings(emailSettings)
|
||||
}
|
||||
}
|
||||
})
|
||||
const p5 = this.collectionsDb.select(() => true).then((results) => {
|
||||
this.collections = results.data.map(l => new Collection(l))
|
||||
Logger.info(`[DB] ${this.collections.length} Collections Loaded`)
|
||||
})
|
||||
const p6 = this.playlistsDb.select(() => true).then((results) => {
|
||||
this.playlists = results.data.map(l => new Playlist(l))
|
||||
Logger.info(`[DB] ${this.playlists.length} Playlists Loaded`)
|
||||
})
|
||||
const p7 = this.authorsDb.select(() => true).then((results) => {
|
||||
this.authors = results.data.map(l => new Author(l))
|
||||
Logger.info(`[DB] ${this.authors.length} Authors Loaded`)
|
||||
})
|
||||
const p8 = this.seriesDb.select(() => true).then((results) => {
|
||||
this.series = results.data.map(l => new Series(l))
|
||||
Logger.info(`[DB] ${this.series.length} Series Loaded`)
|
||||
})
|
||||
await Promise.all([p1, p2, p3, p4, p5, p6, p7, p8])
|
||||
|
||||
// Update server version in server settings
|
||||
if (this.previousVersion) {
|
||||
this.serverSettings.version = version
|
||||
await this.updateServerSettings()
|
||||
}
|
||||
}
|
||||
|
||||
getLibraryItem(id) {
|
||||
return this.libraryItems.find(li => li.id === id)
|
||||
}
|
||||
|
||||
async updateLibraryItem(libraryItem) {
|
||||
return this.updateLibraryItems([libraryItem])
|
||||
}
|
||||
|
||||
async updateLibraryItems(libraryItems) {
|
||||
await Promise.all(libraryItems.map(async (li) => {
|
||||
if (li && li.saveMetadata) return li.saveMetadata()
|
||||
return null
|
||||
}))
|
||||
|
||||
const libraryItemIds = libraryItems.map(li => li.id)
|
||||
return this.libraryItemsDb.update((record) => libraryItemIds.includes(record.id), (record) => {
|
||||
return libraryItems.find(li => li.id === record.id)
|
||||
}).then((results) => {
|
||||
Logger.debug(`[DB] Library Items updated ${results.updated}`)
|
||||
return true
|
||||
}).catch((error) => {
|
||||
Logger.error(`[DB] Library Items update failed ${error}`)
|
||||
return false
|
||||
})
|
||||
}
|
||||
|
||||
removeLibraryItem(id) {
|
||||
return this.libraryItemsDb.delete((record) => record.id === id).then((results) => {
|
||||
Logger.debug(`[DB] Deleted Library Items: ${results.deleted}`)
|
||||
this.libraryItems = this.libraryItems.filter(li => li.id !== id)
|
||||
}).catch((error) => {
|
||||
Logger.error(`[DB] Remove Library Items Failed: ${error}`)
|
||||
})
|
||||
}
|
||||
|
||||
updateServerSettings() {
|
||||
global.ServerSettings = this.serverSettings.toJSON()
|
||||
return this.updateEntity('settings', this.serverSettings)
|
||||
}
|
||||
|
||||
insertEntities(entityName, entities) {
|
||||
var entityDb = this.getEntityDb(entityName)
|
||||
return entityDb.insert(entities).then((results) => {
|
||||
Logger.debug(`[DB] Inserted ${results.inserted} ${entityName}`)
|
||||
|
||||
var arrayKey = this.getEntityArrayKey(entityName)
|
||||
if (this[arrayKey]) this[arrayKey] = this[arrayKey].concat(entities)
|
||||
return true
|
||||
}).catch((error) => {
|
||||
Logger.error(`[DB] Failed to insert ${entityName}`, error)
|
||||
return false
|
||||
})
|
||||
}
|
||||
|
||||
insertEntity(entityName, entity) {
|
||||
var entityDb = this.getEntityDb(entityName)
|
||||
return entityDb.insert([entity]).then((results) => {
|
||||
Logger.debug(`[DB] Inserted ${results.inserted} ${entityName}`)
|
||||
|
||||
var arrayKey = this.getEntityArrayKey(entityName)
|
||||
if (this[arrayKey]) this[arrayKey].push(entity)
|
||||
return true
|
||||
}).catch((error) => {
|
||||
Logger.error(`[DB] Failed to insert ${entityName}`, error)
|
||||
return false
|
||||
})
|
||||
}
|
||||
|
||||
async bulkInsertEntities(entityName, entities, batchSize = 500) {
|
||||
// Group entities in batches of size batchSize
|
||||
var entityBatches = []
|
||||
var batch = []
|
||||
var index = 0
|
||||
entities.forEach((ent) => {
|
||||
batch.push(ent)
|
||||
index++
|
||||
if (index >= batchSize) {
|
||||
entityBatches.push(batch)
|
||||
index = 0
|
||||
batch = []
|
||||
}
|
||||
})
|
||||
if (batch.length) entityBatches.push(batch)
|
||||
|
||||
Logger.info(`[Db] bulkInsertEntities: ${entities.length} ${entityName} to ${entityBatches.length} batches of max size ${batchSize}`)
|
||||
|
||||
// Start inserting batches
|
||||
var batchIndex = 1
|
||||
for (const entityBatch of entityBatches) {
|
||||
Logger.info(`[Db] bulkInsertEntities: Start inserting batch ${batchIndex} of ${entityBatch.length} for ${entityName}`)
|
||||
var success = await this.insertEntities(entityName, entityBatch)
|
||||
if (success) {
|
||||
Logger.info(`[Db] bulkInsertEntities: Success inserting batch ${batchIndex} for ${entityName}`)
|
||||
} else {
|
||||
Logger.info(`[Db] bulkInsertEntities: Failed inserting batch ${batchIndex} for ${entityName}`)
|
||||
}
|
||||
batchIndex++
|
||||
}
|
||||
return true
|
||||
}
|
||||
|
||||
updateEntities(entityName, entities) {
|
||||
var entityDb = this.getEntityDb(entityName)
|
||||
|
||||
var entityIds = entities.map(ent => ent.id)
|
||||
return entityDb.update((record) => entityIds.includes(record.id), (record) => {
|
||||
return entities.find(ent => ent.id === record.id)
|
||||
}).then((results) => {
|
||||
Logger.debug(`[DB] Updated ${entityName}: ${results.updated}`)
|
||||
var arrayKey = this.getEntityArrayKey(entityName)
|
||||
if (this[arrayKey]) {
|
||||
this[arrayKey] = this[arrayKey].map(e => {
|
||||
if (entityIds.includes(e.id)) return entities.find(_e => _e.id === e.id)
|
||||
return e
|
||||
})
|
||||
}
|
||||
return true
|
||||
}).catch((error) => {
|
||||
Logger.error(`[DB] Update ${entityName} Failed: ${error}`)
|
||||
return false
|
||||
})
|
||||
}
|
||||
|
||||
updateEntity(entityName, entity) {
|
||||
const entityDb = this.getEntityDb(entityName)
|
||||
|
||||
let jsonEntity = entity
|
||||
if (entity && entity.toJSON) {
|
||||
jsonEntity = entity.toJSON()
|
||||
}
|
||||
|
||||
return entityDb.update((record) => record.id === entity.id, () => jsonEntity).then((results) => {
|
||||
Logger.debug(`[DB] Updated ${entityName}: ${results.updated}`)
|
||||
const arrayKey = this.getEntityArrayKey(entityName)
|
||||
if (this[arrayKey]) {
|
||||
this[arrayKey] = this[arrayKey].map(e => {
|
||||
return e.id === entity.id ? entity : e
|
||||
})
|
||||
}
|
||||
return true
|
||||
}).catch((error) => {
|
||||
Logger.error(`[DB] Update entity ${entityName} Failed: ${error}`)
|
||||
return false
|
||||
})
|
||||
}
|
||||
|
||||
removeEntity(entityName, entityId) {
|
||||
var entityDb = this.getEntityDb(entityName)
|
||||
return entityDb.delete((record) => {
|
||||
return record.id === entityId
|
||||
}).then((results) => {
|
||||
Logger.debug(`[DB] Deleted entity ${entityName}: ${results.deleted}`)
|
||||
var arrayKey = this.getEntityArrayKey(entityName)
|
||||
if (this[arrayKey]) {
|
||||
this[arrayKey] = this[arrayKey].filter(e => {
|
||||
return e.id !== entityId
|
||||
})
|
||||
}
|
||||
}).catch((error) => {
|
||||
Logger.error(`[DB] Remove entity ${entityName} Failed: ${error}`)
|
||||
})
|
||||
}
|
||||
|
||||
removeEntities(entityName, selectFunc, silent = false) {
|
||||
var entityDb = this.getEntityDb(entityName)
|
||||
return entityDb.delete(selectFunc).then((results) => {
|
||||
if (!silent) Logger.debug(`[DB] Deleted entities ${entityName}: ${results.deleted}`)
|
||||
var arrayKey = this.getEntityArrayKey(entityName)
|
||||
if (this[arrayKey]) {
|
||||
this[arrayKey] = this[arrayKey].filter(e => {
|
||||
return !selectFunc(e)
|
||||
})
|
||||
}
|
||||
return results.deleted
|
||||
}).catch((error) => {
|
||||
Logger.error(`[DB] Remove entities ${entityName} Failed: ${error}`)
|
||||
return 0
|
||||
})
|
||||
}
|
||||
|
||||
recreateLibraryItemsDb() {
|
||||
return this.libraryItemsDb.drop().then((results) => {
|
||||
Logger.info(`[DB] Dropped library items db`, results)
|
||||
this.libraryItemsDb = new njodb.Database(this.LibraryItemsPath)
|
||||
this.libraryItems = []
|
||||
return true
|
||||
}).catch((error) => {
|
||||
Logger.error(`[DB] Failed to drop library items db`, error)
|
||||
return false
|
||||
})
|
||||
}
|
||||
|
||||
getPlaybackSession(id) {
|
||||
return this.sessionsDb.select((pb) => pb.id == id).then((results) => {
|
||||
if (results.data.length) {
|
||||
return new PlaybackSession(results.data[0])
|
||||
}
|
||||
return null
|
||||
}).catch((error) => {
|
||||
Logger.error('Failed to get session', error)
|
||||
return null
|
||||
})
|
||||
}
|
||||
|
||||
// Check if server was updated and previous version was earlier than param
|
||||
checkPreviousVersionIsBefore(version) {
|
||||
if (!this.previousVersion) return false
|
||||
// true if version > previousVersion
|
||||
return version.localeCompare(this.previousVersion) >= 0
|
||||
}
|
||||
}
|
||||
module.exports = Db
|
@ -252,11 +252,7 @@ class LibraryController {
|
||||
}
|
||||
|
||||
if (payload.sortBy) {
|
||||
// old sort key TODO: should be mutated in dbMigration
|
||||
let sortKey = payload.sortBy
|
||||
if (sortKey.startsWith('book.')) {
|
||||
sortKey = sortKey.replace('book.', 'media.metadata.')
|
||||
}
|
||||
|
||||
// Handle server setting sortingIgnorePrefix
|
||||
const sortByTitle = sortKey === 'media.metadata.title'
|
||||
|
@ -1,7 +0,0 @@
|
||||
Copyright 2021 James BonTempo (jamesbontempo@gmail.com)
|
||||
|
||||
Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated documentation files (the "Software"), to deal in the Software without restriction, including without limitation the rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the Software, and to permit persons to whom the Software is furnished to do so, subject to the following conditions:
|
||||
|
||||
The above copyright notice and this permission notice shall be included in all copies or substantial portions of the Software.
|
||||
|
||||
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
|
@ -1,489 +0,0 @@
|
||||
"use strict";
|
||||
|
||||
const {
|
||||
existsSync,
|
||||
mkdirSync,
|
||||
readFileSync,
|
||||
writeFileSync
|
||||
} = require("graceful-fs");
|
||||
|
||||
const {
|
||||
join,
|
||||
resolve
|
||||
} = require("path");
|
||||
|
||||
const {
|
||||
aggregateStoreData,
|
||||
aggregateStoreDataSync,
|
||||
distributeStoreData,
|
||||
distributeStoreDataSync,
|
||||
deleteStoreData,
|
||||
deleteStoreDataSync,
|
||||
dropEverything,
|
||||
dropEverythingSync,
|
||||
getStoreNames,
|
||||
getStoreNamesSync,
|
||||
insertStoreData,
|
||||
insertStoreDataSync,
|
||||
insertFileData,
|
||||
selectStoreData,
|
||||
selectStoreDataSync,
|
||||
statsStoreData,
|
||||
statsStoreDataSync,
|
||||
updateStoreData,
|
||||
updateStoreDataSync
|
||||
} = require("./njodb");
|
||||
|
||||
const {
|
||||
Randomizer,
|
||||
Reducer,
|
||||
Result
|
||||
} = require("./objects");
|
||||
|
||||
const {
|
||||
validateArray,
|
||||
validateFunction,
|
||||
validateName,
|
||||
validateObject,
|
||||
validatePath,
|
||||
validateSize
|
||||
} = require("./validators");
|
||||
|
||||
const defaults = {
|
||||
"datadir": "data",
|
||||
"dataname": "data",
|
||||
"datastores": 5,
|
||||
"tempdir": "tmp",
|
||||
"lockoptions": {
|
||||
"stale": 5000,
|
||||
"update": 1000,
|
||||
"retries": {
|
||||
"retries": 5000,
|
||||
"minTimeout": 250,
|
||||
"maxTimeout": 5000,
|
||||
"factor": 0.15,
|
||||
"randomize": false
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
const mergeProperties = (defaults, userProperties) => {
|
||||
var target = Object.assign({}, defaults);
|
||||
|
||||
for (let key of Object.keys(userProperties)) {
|
||||
if (Object.prototype.hasOwnProperty.call(target, key)) {
|
||||
if (typeof userProperties[key] !== 'object' && !Array.isArray(userProperties[key])) {
|
||||
Object.assign(target, { [key]: userProperties[key] });
|
||||
} else {
|
||||
target[key] = mergeProperties(target[key], userProperties[key]);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return target;
|
||||
}
|
||||
|
||||
const saveProperties = (root, properties) => {
|
||||
properties = {
|
||||
"datadir": properties.datadir,
|
||||
"dataname": properties.dataname,
|
||||
"datastores": properties.datastores,
|
||||
"tempdir": properties.tempdir,
|
||||
"lockoptions": properties.lockoptions
|
||||
};
|
||||
const propertiesFile = join(root, "njodb.properties");
|
||||
writeFileSync(propertiesFile, JSON.stringify(properties, null, 4));
|
||||
return properties;
|
||||
}
|
||||
|
||||
process.on("uncaughtException", error => {
|
||||
if (error.code === "ECOMPROMISED") {
|
||||
console.error(Object.assign(new Error("Stale lock or attempt to update it after release"), { code: error.code }));
|
||||
} else {
|
||||
throw error;
|
||||
}
|
||||
});
|
||||
|
||||
class Database {
|
||||
|
||||
constructor(root, properties = {}) {
|
||||
validateObject(properties);
|
||||
|
||||
this.properties = {};
|
||||
|
||||
if (root !== undefined && root !== null) {
|
||||
validateName(root);
|
||||
this.properties.root = root;
|
||||
} else {
|
||||
this.properties.root = process.cwd();
|
||||
}
|
||||
|
||||
if (!existsSync(this.properties.root)) mkdirSync(this.properties.root);
|
||||
|
||||
const propertiesFile = join(this.properties.root, "njodb.properties");
|
||||
|
||||
if (existsSync(propertiesFile)) {
|
||||
this.setProperties(JSON.parse(readFileSync(propertiesFile)));
|
||||
} else {
|
||||
this.setProperties(mergeProperties(defaults, properties));
|
||||
}
|
||||
|
||||
if (!existsSync(this.properties.datapath)) mkdirSync(this.properties.datapath);
|
||||
if (!existsSync(this.properties.temppath)) mkdirSync(this.properties.temppath);
|
||||
|
||||
this.properties.storenames = getStoreNamesSync(this.properties.datapath, this.properties.dataname);
|
||||
|
||||
return this;
|
||||
}
|
||||
|
||||
// Database management methods
|
||||
|
||||
getProperties() {
|
||||
return this.properties;
|
||||
}
|
||||
|
||||
setProperties(properties) {
|
||||
validateObject(properties);
|
||||
|
||||
this.properties.datadir = (validateName(properties.datadir)) ? properties.datadir : defaults.datadir;
|
||||
this.properties.dataname = (validateName(properties.dataname)) ? properties.dataname : defaults.dataname;
|
||||
this.properties.datastores = (validateSize(properties.datastores)) ? properties.datastores : defaults.datastores;
|
||||
this.properties.tempdir = (validateName(properties.tempdir)) ? properties.tempdir : defaults.tempdir;
|
||||
this.properties.lockoptions = (validateObject(properties.lockoptions)) ? properties.lockoptions : defaults.lockoptions;
|
||||
this.properties.datapath = join(this.properties.root, this.properties.datadir);
|
||||
this.properties.temppath = join(this.properties.root, this.properties.tempdir);
|
||||
|
||||
saveProperties(this.properties.root, this.properties);
|
||||
|
||||
return this.properties;
|
||||
}
|
||||
|
||||
async stats() {
|
||||
var stats = {
|
||||
root: resolve(this.properties.root),
|
||||
data: resolve(this.properties.datapath),
|
||||
temp: resolve(this.properties.temppath)
|
||||
};
|
||||
|
||||
var promises = [];
|
||||
|
||||
for (const storename of this.properties.storenames) {
|
||||
const storepath = join(this.properties.datapath, storename);
|
||||
promises.push(statsStoreData(storepath, this.properties.lockoptions));
|
||||
}
|
||||
|
||||
const results = await Promise.all(promises);
|
||||
|
||||
return Object.assign(stats, Reducer("stats", results));
|
||||
}
|
||||
|
||||
statsSync() {
|
||||
var stats = {
|
||||
root: resolve(this.properties.root),
|
||||
data: resolve(this.properties.datapath),
|
||||
temp: resolve(this.properties.temppath)
|
||||
};
|
||||
|
||||
var results = [];
|
||||
|
||||
for (const storename of this.properties.storenames) {
|
||||
const storepath = join(this.properties.datapath, storename);
|
||||
results.push(statsStoreDataSync(storepath));
|
||||
}
|
||||
|
||||
return Object.assign(stats, Reducer("stats", results));
|
||||
}
|
||||
|
||||
async grow() {
|
||||
this.properties.datastores++;
|
||||
const results = await distributeStoreData(this.properties);
|
||||
this.properties.storenames = await getStoreNames(this.properties.datapath, this.properties.dataname);
|
||||
saveProperties(this.properties.root, this.properties);
|
||||
return results;
|
||||
}
|
||||
|
||||
growSync() {
|
||||
this.properties.datastores++;
|
||||
const results = distributeStoreDataSync(this.properties);
|
||||
this.properties.storenames = getStoreNamesSync(this.properties.datapath, this.properties.dataname);
|
||||
saveProperties(this.properties.root, this.properties);
|
||||
return results;
|
||||
}
|
||||
|
||||
async shrink() {
|
||||
if (this.properties.datastores > 1) {
|
||||
this.properties.datastores--;
|
||||
const results = await distributeStoreData(this.properties);
|
||||
this.properties.storenames = await getStoreNames(this.properties.datapath, this.properties.dataname);
|
||||
saveProperties(this.properties.root, this.properties);
|
||||
return results;
|
||||
} else {
|
||||
throw new Error("Database cannot shrink any further");
|
||||
}
|
||||
}
|
||||
|
||||
shrinkSync() {
|
||||
if (this.properties.datastores > 1) {
|
||||
this.properties.datastores--;
|
||||
const results = distributeStoreDataSync(this.properties);
|
||||
this.properties.storenames = getStoreNamesSync(this.properties.datapath, this.properties.dataname);
|
||||
saveProperties(this.properties.root, this.properties);
|
||||
return results;
|
||||
} else {
|
||||
throw new Error("Database cannot shrink any further");
|
||||
}
|
||||
}
|
||||
|
||||
async resize(size) {
|
||||
validateSize(size);
|
||||
this.properties.datastores = size;
|
||||
const results = await distributeStoreData(this.properties);
|
||||
this.properties.storenames = await getStoreNames(this.properties.datapath, this.properties.dataname);
|
||||
saveProperties(this.properties.root, this.properties);
|
||||
return results;
|
||||
}
|
||||
|
||||
resizeSync(size) {
|
||||
validateSize(size);
|
||||
this.properties.datastores = size;
|
||||
const results = distributeStoreDataSync(this.properties);
|
||||
this.properties.storenames = getStoreNamesSync(this.properties.datapath, this.properties.dataname);
|
||||
saveProperties(this.properties.root, this.properties);
|
||||
return results;
|
||||
}
|
||||
|
||||
async drop() {
|
||||
const results = await dropEverything(this.properties);
|
||||
return Reducer("drop", results);
|
||||
}
|
||||
|
||||
dropSync() {
|
||||
const results = dropEverythingSync(this.properties);
|
||||
return Reducer("drop", results);
|
||||
}
|
||||
|
||||
// Data manipulation methods
|
||||
|
||||
async insert(data) {
|
||||
validateArray(data);
|
||||
|
||||
var promises = [];
|
||||
var records = [];
|
||||
|
||||
for (let i = 0; i < this.properties.datastores; i++) {
|
||||
records[i] = "";
|
||||
}
|
||||
|
||||
for (let i = 0; i < data.length; i++) {
|
||||
records[i % this.properties.datastores] += JSON.stringify(data[i]) + "\n";
|
||||
}
|
||||
|
||||
const randomizer = Randomizer(Array.from(Array(this.properties.datastores).keys()), false);
|
||||
|
||||
for (var j = 0; j < records.length; j++) {
|
||||
if (records[j] !== "") {
|
||||
const storenumber = randomizer.next();
|
||||
const storename = [this.properties.dataname, storenumber, "json"].join(".");
|
||||
const storepath = join(this.properties.datapath, storename)
|
||||
promises.push(insertStoreData(storepath, records[j], this.properties.lockoptions));
|
||||
}
|
||||
}
|
||||
|
||||
const results = await Promise.all(promises);
|
||||
|
||||
this.properties.storenames = await getStoreNames(this.properties.datapath, this.properties.dataname);
|
||||
|
||||
return Reducer("insert", results);
|
||||
}
|
||||
|
||||
insertSync(data) {
|
||||
validateArray(data);
|
||||
|
||||
var results = [];
|
||||
var records = [];
|
||||
|
||||
for (let i = 0; i < this.properties.datastores; i++) {
|
||||
records[i] = "";
|
||||
}
|
||||
|
||||
for (let i = 0; i < data.length; i++) {
|
||||
records[i % this.properties.datastores] += JSON.stringify(data[i]) + "\n";
|
||||
}
|
||||
|
||||
const randomizer = Randomizer(Array.from(Array(this.properties.datastores).keys()), false);
|
||||
|
||||
for (var j = 0; j < records.length; j++) {
|
||||
if (records[j] !== "") {
|
||||
const storenumber = randomizer.next();
|
||||
const storename = [this.properties.dataname, storenumber, "json"].join(".");
|
||||
const storepath = join(this.properties.datapath, storename)
|
||||
results.push(insertStoreDataSync(storepath, records[j], this.properties.lockoptions));
|
||||
}
|
||||
}
|
||||
|
||||
this.properties.storenames = getStoreNamesSync(this.properties.datapath, this.properties.dataname);
|
||||
|
||||
return Reducer("insert", results);
|
||||
}
|
||||
|
||||
async insertFile(file) {
|
||||
validatePath(file);
|
||||
|
||||
const results = await insertFileData(file, this.properties.datapath, this.properties.storenames, this.properties.lockoptions);
|
||||
|
||||
return results;
|
||||
}
|
||||
|
||||
insertFileSync(file) {
|
||||
validatePath(file);
|
||||
|
||||
const data = readFileSync(file, "utf8").split("\n");
|
||||
var records = [];
|
||||
|
||||
var results = Result("insertFile");
|
||||
|
||||
for (var record of data) {
|
||||
record = record.trim()
|
||||
|
||||
results.lines++;
|
||||
|
||||
if (record.length > 0) {
|
||||
try {
|
||||
records.push(JSON.parse(record));
|
||||
} catch (error) {
|
||||
results.errors.push({ error: error.message, line: results.lines, data: record });
|
||||
}
|
||||
} else {
|
||||
results.blanks++;
|
||||
}
|
||||
}
|
||||
|
||||
return Object.assign(results, this.insertSync(records));
|
||||
}
|
||||
|
||||
async select(match, project) {
|
||||
validateFunction(match);
|
||||
if (project) validateFunction(project);
|
||||
|
||||
var promises = [];
|
||||
|
||||
for (const storename of this.properties.storenames) {
|
||||
const storepath = join(this.properties.datapath, storename);
|
||||
promises.push(selectStoreData(storepath, match, project, this.properties.lockoptions));
|
||||
}
|
||||
|
||||
const results = await Promise.all(promises);
|
||||
return Reducer("select", results);
|
||||
}
|
||||
|
||||
selectSync(match, project) {
|
||||
validateFunction(match);
|
||||
if (project) validateFunction(project);
|
||||
|
||||
var results = [];
|
||||
|
||||
for (const storename of this.properties.storenames) {
|
||||
const storepath = join(this.properties.datapath, storename);
|
||||
results.push(selectStoreDataSync(storepath, match, project));
|
||||
}
|
||||
|
||||
return Reducer("select", results);
|
||||
}
|
||||
|
||||
async update(match, update) {
|
||||
validateFunction(match);
|
||||
validateFunction(update);
|
||||
|
||||
var promises = [];
|
||||
|
||||
for (const storename of this.properties.storenames) {
|
||||
const storepath = join(this.properties.datapath, storename);
|
||||
const tempstorename = [storename, Date.now(), "tmp"].join(".");
|
||||
const tempstorepath = join(this.properties.temppath, tempstorename);
|
||||
promises.push(updateStoreData(storepath, match, update, tempstorepath, this.properties.lockoptions));
|
||||
}
|
||||
|
||||
const results = await Promise.all(promises);
|
||||
return Reducer("update", results);
|
||||
}
|
||||
|
||||
updateSync(match, update) {
|
||||
validateFunction(match);
|
||||
validateFunction(update);
|
||||
|
||||
var results = [];
|
||||
|
||||
for (const storename of this.properties.storenames) {
|
||||
const storepath = join(this.properties.datapath, storename);
|
||||
const tempstorename = [storename, Date.now(), "tmp"].join(".");
|
||||
const tempstorepath = join(this.properties.temppath, tempstorename);
|
||||
results.push(updateStoreDataSync(storepath, match, update, tempstorepath));
|
||||
}
|
||||
|
||||
return Reducer("update", results);
|
||||
}
|
||||
|
||||
async delete(match) {
|
||||
validateFunction(match);
|
||||
|
||||
var promises = [];
|
||||
|
||||
for (const storename of this.properties.storenames) {
|
||||
const storepath = join(this.properties.datapath, storename);
|
||||
const tempstorename = [storename, Date.now(), "tmp"].join(".");
|
||||
const tempstorepath = join(this.properties.temppath, tempstorename);
|
||||
promises.push(deleteStoreData(storepath, match, tempstorepath, this.properties.lockoptions));
|
||||
}
|
||||
|
||||
const results = await Promise.all(promises);
|
||||
return Reducer("delete", results);
|
||||
}
|
||||
|
||||
deleteSync(match) {
|
||||
validateFunction(match);
|
||||
|
||||
var results = [];
|
||||
|
||||
for (const storename of this.properties.storenames) {
|
||||
const storepath = join(this.properties.datapath, storename);
|
||||
const tempstorename = [storename, Date.now(), "tmp"].join(".");
|
||||
const tempstorepath = join(this.properties.temppath, tempstorename);
|
||||
results.push(deleteStoreDataSync(storepath, match, tempstorepath));
|
||||
}
|
||||
|
||||
return Reducer("delete", results);
|
||||
}
|
||||
|
||||
async aggregate(match, index, project) {
|
||||
validateFunction(match);
|
||||
validateFunction(index);
|
||||
if (project) validateFunction(project);
|
||||
|
||||
var promises = [];
|
||||
|
||||
for (const storename of this.properties.storenames) {
|
||||
const storepath = join(this.properties.datapath, storename);
|
||||
promises.push(aggregateStoreData(storepath, match, index, project, this.properties.lockoptions));
|
||||
}
|
||||
|
||||
const results = await Promise.all(promises);
|
||||
return Reducer("aggregate", results);
|
||||
}
|
||||
|
||||
aggregateSync(match, index, project) {
|
||||
validateFunction(match);
|
||||
validateFunction(index);
|
||||
if (project) validateFunction(project);
|
||||
|
||||
var results = [];
|
||||
|
||||
for (const storename of this.properties.storenames) {
|
||||
const storepath = join(this.properties.datapath, storename);
|
||||
results.push(aggregateStoreDataSync(storepath, match, index, project));
|
||||
}
|
||||
|
||||
return Reducer("aggregate", results);
|
||||
}
|
||||
}
|
||||
|
||||
exports.Database = Database;
|
@ -1,723 +0,0 @@
|
||||
"use strict";
|
||||
|
||||
const {
|
||||
appendFile,
|
||||
appendFileSync,
|
||||
createReadStream,
|
||||
createWriteStream,
|
||||
readFileSync,
|
||||
readdir,
|
||||
readdirSync,
|
||||
stat,
|
||||
statSync,
|
||||
writeFile
|
||||
} = require("graceful-fs");
|
||||
|
||||
const {
|
||||
join,
|
||||
resolve
|
||||
} = require("path");
|
||||
|
||||
const { createInterface } = require("readline");
|
||||
|
||||
const { promisify } = require("util");
|
||||
|
||||
const {
|
||||
check,
|
||||
checkSync,
|
||||
lock,
|
||||
lockSync
|
||||
} = require("../properLockfile");
|
||||
|
||||
const {
|
||||
deleteFile,
|
||||
deleteFileSync,
|
||||
deleteDirectory,
|
||||
deleteDirectorySync,
|
||||
fileExists,
|
||||
fileExistsSync,
|
||||
moveFile,
|
||||
moveFileSync,
|
||||
releaseLock,
|
||||
releaseLockSync,
|
||||
replaceFile,
|
||||
replaceFileSync
|
||||
} = require("./utils");
|
||||
|
||||
const {
|
||||
Handler,
|
||||
Randomizer,
|
||||
Result
|
||||
} = require("./objects");
|
||||
|
||||
const filterStoreNames = (files, dataname) => {
|
||||
var storenames = [];
|
||||
const re = new RegExp("^" + [dataname, "\\d+", "json"].join(".") + "$");
|
||||
for (const file of files) {
|
||||
if (re.test(file)) storenames.push(file);
|
||||
}
|
||||
return storenames;
|
||||
};
|
||||
|
||||
const getStoreNames = async (datapath, dataname) => {
|
||||
const files = await promisify(readdir)(datapath);
|
||||
return filterStoreNames(files, dataname);
|
||||
}
|
||||
|
||||
const getStoreNamesSync = (datapath, dataname) => {
|
||||
const files = readdirSync(datapath);
|
||||
return filterStoreNames(files, dataname);
|
||||
};
|
||||
|
||||
// Database management
|
||||
|
||||
const statsStoreData = async (store, lockoptions) => {
|
||||
var release, stats, results;
|
||||
|
||||
release = await lock(store, lockoptions);
|
||||
|
||||
const handlerResults = await new Promise((resolve, reject) => {
|
||||
const reader = createInterface({ input: createReadStream(store), crlfDelay: Infinity });
|
||||
const handler = Handler("stats");
|
||||
|
||||
reader.on("line", record => handler.next(record));
|
||||
reader.on("close", () => resolve(handler.return()));
|
||||
reader.on("error", error => reject(error));
|
||||
});
|
||||
|
||||
if (await check(store, lockoptions)) await releaseLock(store, release);
|
||||
|
||||
results = Object.assign({ store: resolve(store) }, handlerResults)
|
||||
|
||||
stats = await promisify(stat)(store);
|
||||
results.size = stats.size;
|
||||
results.created = stats.birthtime;
|
||||
results.modified = stats.mtime;
|
||||
|
||||
results.end = Date.now()
|
||||
|
||||
return results;
|
||||
};
|
||||
|
||||
const statsStoreDataSync = (store) => {
|
||||
var file, release, results;
|
||||
|
||||
release = lockSync(store);
|
||||
file = readFileSync(store, "utf8");
|
||||
|
||||
if (checkSync(store)) releaseLockSync(store, release);
|
||||
|
||||
const data = file.split("\n");
|
||||
const handler = Handler("stats");
|
||||
|
||||
for (var record of data) {
|
||||
handler.next(record)
|
||||
}
|
||||
|
||||
results = Object.assign({ store: resolve(store) }, handler.return());
|
||||
|
||||
const stats = statSync(store);
|
||||
results.size = stats.size;
|
||||
results.created = stats.birthtime;
|
||||
results.modified = stats.mtime;
|
||||
|
||||
results.end = Date.now();
|
||||
|
||||
return results;
|
||||
};
|
||||
|
||||
const distributeStoreData = async (properties) => {
|
||||
var results = Result("distribute");
|
||||
|
||||
var storepaths = [];
|
||||
var tempstorepaths = [];
|
||||
|
||||
var locks = [];
|
||||
|
||||
for (let storename of properties.storenames) {
|
||||
const storepath = join(properties.datapath, storename);
|
||||
storepaths.push(storepath);
|
||||
locks.push(lock(storepath, properties.lockoptions));
|
||||
}
|
||||
|
||||
const releases = await Promise.all(locks);
|
||||
|
||||
var writes = [];
|
||||
var writers = [];
|
||||
|
||||
for (let i = 0; i < properties.datastores; i++) {
|
||||
const tempstorepath = join(properties.temppath, [properties.dataname, i, results.start, "json"].join("."));
|
||||
tempstorepaths.push(tempstorepath);
|
||||
await promisify(writeFile)(tempstorepath, "");
|
||||
writers.push(createWriteStream(tempstorepath, { flags: "r+" }));
|
||||
}
|
||||
|
||||
for (let storename of properties.storenames) {
|
||||
writes.push(new Promise((resolve, reject) => {
|
||||
var line = 0;
|
||||
const store = join(properties.datapath, storename);
|
||||
const randomizer = Randomizer(Array.from(Array(properties.datastores).keys()), false);
|
||||
const reader = createInterface({ input: createReadStream(store), crlfDelay: Infinity });
|
||||
|
||||
reader.on("line", record => {
|
||||
const storenumber = randomizer.next();
|
||||
|
||||
line++;
|
||||
try {
|
||||
record = JSON.stringify(JSON.parse(record));
|
||||
results.records++;
|
||||
} catch {
|
||||
results.errors.push({ line: line, data: record });
|
||||
} finally {
|
||||
writers[storenumber].write(record + "\n");
|
||||
}
|
||||
});
|
||||
|
||||
reader.on("close", () => {
|
||||
resolve(true);
|
||||
});
|
||||
|
||||
reader.on("error", error => {
|
||||
reject(error);
|
||||
});
|
||||
}));
|
||||
}
|
||||
|
||||
await Promise.all(writes);
|
||||
|
||||
for (let writer of writers) {
|
||||
writer.end();
|
||||
}
|
||||
|
||||
var deletes = [];
|
||||
|
||||
for (let storepath of storepaths) {
|
||||
deletes.push(deleteFile(storepath));
|
||||
}
|
||||
|
||||
await Promise.all(deletes);
|
||||
|
||||
for (const release of releases) {
|
||||
release();
|
||||
}
|
||||
|
||||
var moves = [];
|
||||
|
||||
for (let i = 0; i < tempstorepaths.length; i++) {
|
||||
moves.push(moveFile(tempstorepaths[i], join(properties.datapath, [properties.dataname, i, "json"].join("."))))
|
||||
}
|
||||
|
||||
await Promise.all(moves);
|
||||
|
||||
results.stores = tempstorepaths.length,
|
||||
results.end = Date.now();
|
||||
results.elapsed = results.end - results.start;
|
||||
|
||||
return results;
|
||||
|
||||
};
|
||||
|
||||
const distributeStoreDataSync = (properties) => {
|
||||
var results = Result("distribute");
|
||||
|
||||
var storepaths = [];
|
||||
var tempstorepaths = [];
|
||||
|
||||
var releases = [];
|
||||
var data = [];
|
||||
|
||||
for (let storename of properties.storenames) {
|
||||
const storepath = join(properties.datapath, storename);
|
||||
storepaths.push(storepath);
|
||||
releases.push(lockSync(storepath));
|
||||
const file = readFileSync(storepath, "utf8").trimEnd();
|
||||
if (file.length > 0) data = data.concat(file.split("\n"));
|
||||
}
|
||||
|
||||
var records = [];
|
||||
|
||||
for (var i = 0; i < data.length; i++) {
|
||||
try {
|
||||
data[i] = JSON.stringify(JSON.parse(data[i]));
|
||||
results.records++;
|
||||
} catch (error) {
|
||||
results.errors.push({ line: i, data: data[i] });
|
||||
} finally {
|
||||
if (i === i % properties.datastores) records[i] = [];
|
||||
records[i % properties.datastores] += data[i] + "\n";
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
const randomizer = Randomizer(Array.from(Array(properties.datastores).keys()), false);
|
||||
|
||||
for (var j = 0; j < records.length; j++) {
|
||||
const storenumber = randomizer.next();
|
||||
const tempstorepath = join(properties.temppath, [properties.dataname, storenumber, results.start, "json"].join("."));
|
||||
tempstorepaths.push(tempstorepath);
|
||||
appendFileSync(tempstorepath, records[j]);
|
||||
}
|
||||
|
||||
for (let storepath of storepaths) {
|
||||
deleteFileSync(storepath);
|
||||
}
|
||||
|
||||
for (const release of releases) {
|
||||
release();
|
||||
}
|
||||
|
||||
for (let i = 0; i < tempstorepaths.length; i++) {
|
||||
moveFileSync(tempstorepaths[i], join(properties.datapath, [properties.dataname, i, "json"].join(".")));
|
||||
}
|
||||
|
||||
results.stores = tempstorepaths.length,
|
||||
results.end = Date.now();
|
||||
results.elapsed = results.end - results.start;
|
||||
|
||||
return results;
|
||||
|
||||
};
|
||||
|
||||
const dropEverything = async (properties) => {
|
||||
var locks = [];
|
||||
|
||||
for (let storename of properties.storenames) {
|
||||
locks.push(lock(join(properties.datapath, storename), properties.lockoptions));
|
||||
}
|
||||
|
||||
const releases = await Promise.all(locks);
|
||||
|
||||
var deletes = [];
|
||||
|
||||
for (let storename of properties.storenames) {
|
||||
deletes.push(deleteFile(join(properties.datapath, storename)));
|
||||
}
|
||||
|
||||
var results = await Promise.all(deletes);
|
||||
|
||||
for (const release of releases) {
|
||||
release();
|
||||
}
|
||||
|
||||
deletes = [
|
||||
deleteDirectory(properties.temppath),
|
||||
deleteDirectory(properties.datapath),
|
||||
deleteFile(join(properties.root, "njodb.properties"))
|
||||
];
|
||||
|
||||
results = results.concat(await Promise.all(deletes));
|
||||
|
||||
return results;
|
||||
}
|
||||
|
||||
const dropEverythingSync = (properties) => {
|
||||
var results = [];
|
||||
var releases = [];
|
||||
|
||||
for (let storename of properties.storenames) {
|
||||
releases.push(lockSync(join(properties.datapath, storename)));
|
||||
}
|
||||
|
||||
for (let storename of properties.storenames) {
|
||||
results.push(deleteFileSync(join(properties.datapath, storename)));
|
||||
}
|
||||
|
||||
for (const release of releases) {
|
||||
release();
|
||||
}
|
||||
|
||||
results.push(deleteDirectorySync(properties.temppath));
|
||||
results.push(deleteDirectorySync(properties.datapath));
|
||||
results.push(deleteFileSync(join(properties.root, "njodb.properties")));
|
||||
|
||||
return results;
|
||||
}
|
||||
|
||||
// Data manipulation
|
||||
|
||||
const insertStoreData = async (store, data, lockoptions) => {
|
||||
let release, results;
|
||||
|
||||
results = Object.assign({ store: resolve(store) }, Result("insert"));
|
||||
|
||||
if (await fileExists(store)) release = await lock(store, lockoptions);
|
||||
|
||||
await promisify(appendFile)(store, data, "utf8");
|
||||
|
||||
if (await check(store, lockoptions)) await releaseLock(store, release);
|
||||
|
||||
results.inserted = (data.length > 0) ? data.split("\n").length - 1 : 0;
|
||||
results.end = Date.now();
|
||||
|
||||
return results;
|
||||
};
|
||||
|
||||
const insertStoreDataSync = (store, data) => {
|
||||
let release, results;
|
||||
|
||||
results = Object.assign({ store: resolve(store) }, Result("insert"));
|
||||
|
||||
if (fileExistsSync(store)) release = lockSync(store);
|
||||
|
||||
appendFileSync(store, data, "utf8");
|
||||
|
||||
if (checkSync(store)) releaseLockSync(store, release);
|
||||
|
||||
results.inserted = (data.length > 0) ? data.split("\n").length - 1 : 0;
|
||||
results.end = Date.now();
|
||||
|
||||
return results;
|
||||
};
|
||||
|
||||
const insertFileData = async (file, datapath, storenames, lockoptions) => {
|
||||
let datastores, locks, releases, writers, results;
|
||||
|
||||
results = Result("insertFile");
|
||||
|
||||
datastores = storenames.length;
|
||||
locks = [];
|
||||
writers = [];
|
||||
|
||||
for (let storename of storenames) {
|
||||
const storepath = join(datapath, storename);
|
||||
locks.push(lock(storepath, lockoptions));
|
||||
writers.push(createWriteStream(storepath, { flags: "r+" }));
|
||||
}
|
||||
|
||||
releases = await Promise.all(locks);
|
||||
|
||||
await new Promise((resolve, reject) => {
|
||||
const randomizer = Randomizer(Array.from(Array(datastores).keys()), false);
|
||||
const reader = createInterface({ input: createReadStream(file), crlfDelay: Infinity });
|
||||
|
||||
reader.on("line", record => {
|
||||
record = record.trim();
|
||||
|
||||
const storenumber = randomizer.next();
|
||||
results.lines++;
|
||||
|
||||
if (record.length > 0) {
|
||||
try {
|
||||
record = JSON.parse(record);
|
||||
results.inserted++;
|
||||
} catch (error) {
|
||||
results.errors.push({ error: error.message, line: results.lines, data: record });
|
||||
} finally {
|
||||
writers[storenumber].write(JSON.stringify(record) + "\n");
|
||||
}
|
||||
} else {
|
||||
results.blanks++;
|
||||
}
|
||||
});
|
||||
|
||||
reader.on("close", () => {
|
||||
resolve(true);
|
||||
});
|
||||
|
||||
reader.on("error", error => {
|
||||
reject(error);
|
||||
});
|
||||
});
|
||||
|
||||
for (const writer of writers) {
|
||||
writer.end();
|
||||
}
|
||||
|
||||
for (const release of releases) {
|
||||
release();
|
||||
}
|
||||
|
||||
results.end = Date.now();
|
||||
results.elapsed = results.end - results.start;
|
||||
|
||||
return results;
|
||||
}
|
||||
|
||||
const selectStoreData = async (store, match, project, lockoptions) => {
|
||||
let release, results;
|
||||
|
||||
release = await lock(store, lockoptions);
|
||||
|
||||
const handlerResults = await new Promise((resolve, reject) => {
|
||||
const reader = createInterface({ input: createReadStream(store), crlfDelay: Infinity });
|
||||
const handler = Handler("select", match, project);
|
||||
|
||||
reader.on("line", record => handler.next(record));
|
||||
reader.on("close", () => resolve(handler.return()));
|
||||
reader.on("error", error => reject(error));
|
||||
});
|
||||
|
||||
if (await check(store, lockoptions)) await releaseLock(store, release);
|
||||
|
||||
results = Object.assign({ store: store }, handlerResults);
|
||||
|
||||
return results;
|
||||
};
|
||||
|
||||
const selectStoreDataSync = (store, match, project) => {
|
||||
let file, release, results;
|
||||
|
||||
release = lockSync(store);
|
||||
|
||||
file = readFileSync(store, "utf8");
|
||||
|
||||
if (checkSync(store)) releaseLockSync(store, release);
|
||||
|
||||
const records = file.split("\n");
|
||||
const handler = Handler("select", match, project);
|
||||
|
||||
for (var record of records) {
|
||||
handler.next(record);
|
||||
}
|
||||
|
||||
results = Object.assign({ store: store }, handler.return());
|
||||
|
||||
return results;
|
||||
};
|
||||
|
||||
const updateStoreData = async (store, match, update, tempstore, lockoptions) => {
|
||||
let release, results;
|
||||
|
||||
release = await lock(store, lockoptions);
|
||||
|
||||
const handlerResults = await new Promise((resolve, reject) => {
|
||||
|
||||
const writer = createWriteStream(tempstore);
|
||||
const handler = Handler("update", match, update);
|
||||
|
||||
writer.on("open", () => {
|
||||
// Reader was opening and closing before writer ever opened
|
||||
const reader = createInterface({ input: createReadStream(store), crlfDelay: Infinity });
|
||||
|
||||
reader.on("line", record => {
|
||||
handler.next(record, writer)
|
||||
});
|
||||
|
||||
reader.on("close", () => {
|
||||
writer.end();
|
||||
resolve(handler.return());
|
||||
});
|
||||
|
||||
reader.on("error", error => reject(error));
|
||||
});
|
||||
|
||||
writer.on("error", error => reject(error));
|
||||
});
|
||||
|
||||
results = Object.assign({ store: store, tempstore: tempstore }, handlerResults);
|
||||
|
||||
if (results.updated > 0) {
|
||||
if (!await replaceFile(store, tempstore)) {
|
||||
results.errors = [...results.records];
|
||||
results.updated = 0;
|
||||
}
|
||||
} else {
|
||||
await deleteFile(tempstore);
|
||||
}
|
||||
|
||||
if (await check(store, lockoptions)) await releaseLock(store, release);
|
||||
|
||||
results.end = Date.now();
|
||||
delete results.data;
|
||||
delete results.records;
|
||||
|
||||
return results;
|
||||
};
|
||||
|
||||
const updateStoreDataSync = (store, match, update, tempstore) => {
|
||||
let file, release, results;
|
||||
|
||||
release = lockSync(store);
|
||||
file = readFileSync(store, "utf8").trimEnd();
|
||||
|
||||
if (checkSync(store)) releaseLockSync(store, release);
|
||||
|
||||
|
||||
const records = file.split("\n");
|
||||
const handler = Handler("update", match, update);
|
||||
|
||||
for (var record of records) {
|
||||
handler.next(record);
|
||||
}
|
||||
|
||||
results = Object.assign({ store: store, tempstore: tempstore }, handler.return());
|
||||
|
||||
if (results.updated > 0) {
|
||||
let append, replace;
|
||||
|
||||
try {
|
||||
appendFileSync(tempstore, results.data.join("\n") + "\n", "utf8");
|
||||
append = true;
|
||||
} catch {
|
||||
append = false;
|
||||
}
|
||||
|
||||
if (append) replace = replaceFileSync(store, tempstore);
|
||||
|
||||
if (!(append || replace)) {
|
||||
results.errors = [...results.records];
|
||||
results.updated = 0;
|
||||
}
|
||||
}
|
||||
|
||||
results.end = Date.now();
|
||||
delete results.data;
|
||||
delete results.records;
|
||||
|
||||
return results;
|
||||
|
||||
};
|
||||
|
||||
const deleteStoreData = async (store, match, tempstore, lockoptions) => {
|
||||
let release, results;
|
||||
release = await lock(store, lockoptions);
|
||||
|
||||
const handlerResults = await new Promise((resolve, reject) => {
|
||||
const writer = createWriteStream(tempstore);
|
||||
const handler = Handler("delete", match);
|
||||
|
||||
writer.on("open", () => {
|
||||
// Create reader after writer opens otherwise the reader can sometimes close before the writer opens
|
||||
const reader = createInterface({ input: createReadStream(store), crlfDelay: Infinity });
|
||||
|
||||
reader.on("line", record => handler.next(record, writer));
|
||||
|
||||
reader.on("close", () => {
|
||||
writer.end();
|
||||
resolve(handler.return());
|
||||
});
|
||||
|
||||
reader.on("error", error => reject(error));
|
||||
});
|
||||
|
||||
writer.on("error", error => reject(error));
|
||||
});
|
||||
|
||||
results = Object.assign({ store: store, tempstore: tempstore }, handlerResults);
|
||||
|
||||
if (results.deleted > 0) {
|
||||
if (!await replaceFile(store, tempstore)) {
|
||||
results.errors = [...results.records];
|
||||
results.deleted = 0;
|
||||
}
|
||||
} else {
|
||||
await deleteFile(tempstore);
|
||||
}
|
||||
|
||||
if (await check(store, lockoptions)) await releaseLock(store, release);
|
||||
|
||||
results.end = Date.now();
|
||||
delete results.data;
|
||||
delete results.records;
|
||||
|
||||
return results;
|
||||
|
||||
};
|
||||
|
||||
const deleteStoreDataSync = (store, match, tempstore) => {
|
||||
let file, release, results;
|
||||
|
||||
release = lockSync(store);
|
||||
file = readFileSync(store, "utf8");
|
||||
|
||||
if (checkSync(store)) releaseLockSync(store, release);
|
||||
|
||||
const records = file.split("\n");
|
||||
const handler = Handler("delete", match);
|
||||
|
||||
for (var record of records) {
|
||||
handler.next(record)
|
||||
}
|
||||
|
||||
results = Object.assign({ store: store, tempstore: tempstore }, handler.return());
|
||||
|
||||
if (results.deleted > 0) {
|
||||
let append, replace;
|
||||
|
||||
try {
|
||||
appendFileSync(tempstore, results.data.join("\n") + "\n", "utf8");
|
||||
append = true;
|
||||
} catch {
|
||||
append = false;
|
||||
}
|
||||
|
||||
if (append) replace = replaceFileSync(store, tempstore);
|
||||
|
||||
if (!(append || replace)) {
|
||||
results.errors = [...results.records];
|
||||
results.updated = 0;
|
||||
}
|
||||
}
|
||||
|
||||
results.end = Date.now();
|
||||
delete results.data;
|
||||
delete results.records;
|
||||
|
||||
return results;
|
||||
};
|
||||
|
||||
const aggregateStoreData = async (store, match, index, project, lockoptions) => {
|
||||
let release, results;
|
||||
|
||||
release = await lock(store, lockoptions);
|
||||
|
||||
const handlerResults = await new Promise((resolve, reject) => {
|
||||
const reader = createInterface({ input: createReadStream(store), crlfDelay: Infinity });
|
||||
const handler = Handler("aggregate", match, index, project);
|
||||
|
||||
reader.on("line", record => handler.next(record));
|
||||
reader.on("close", () => resolve(handler.return()));
|
||||
reader.on("error", error => reject(error));
|
||||
});
|
||||
|
||||
if (await check(store, lockoptions)) releaseLock(store, release);
|
||||
|
||||
results = Object.assign({ store: store }, handlerResults);
|
||||
|
||||
return results;
|
||||
}
|
||||
|
||||
const aggregateStoreDataSync = (store, match, index, project) => {
|
||||
let file, release, results;
|
||||
|
||||
release = lockSync(store);
|
||||
file = readFileSync(store, "utf8");
|
||||
|
||||
if (checkSync(store)) releaseLockSync(store, release);
|
||||
|
||||
const records = file.split("\n");
|
||||
const handler = Handler("aggregate", match, index, project);
|
||||
|
||||
for (var record of records) {
|
||||
handler.next(record);
|
||||
}
|
||||
|
||||
results = Object.assign({ store: store }, handler.return());
|
||||
|
||||
return results;
|
||||
}
|
||||
|
||||
exports.getStoreNames = getStoreNames;
|
||||
exports.getStoreNamesSync = getStoreNamesSync;
|
||||
|
||||
// Database management
|
||||
exports.statsStoreData = statsStoreData;
|
||||
exports.statsStoreDataSync = statsStoreDataSync;
|
||||
exports.distributeStoreData = distributeStoreData;
|
||||
exports.distributeStoreDataSync = distributeStoreDataSync;
|
||||
exports.dropEverything = dropEverything;
|
||||
exports.dropEverythingSync = dropEverythingSync;
|
||||
|
||||
// Data manipulation
|
||||
exports.insertStoreData = insertStoreData;
|
||||
exports.insertStoreDataSync = insertStoreDataSync;
|
||||
exports.insertFileData = insertFileData;
|
||||
exports.selectStoreData = selectStoreData;
|
||||
exports.selectStoreDataSync = selectStoreDataSync;
|
||||
exports.updateStoreData = updateStoreData;
|
||||
exports.updateStoreDataSync = updateStoreDataSync;
|
||||
exports.deleteStoreData = deleteStoreData;
|
||||
exports.deleteStoreDataSync = deleteStoreDataSync;
|
||||
exports.aggregateStoreData = aggregateStoreData;
|
||||
exports.aggregateStoreDataSync = aggregateStoreDataSync;
|
||||
|
@ -1,608 +0,0 @@
|
||||
"use strict";
|
||||
|
||||
const {
|
||||
convertSize,
|
||||
max,
|
||||
min
|
||||
} = require("./utils");
|
||||
|
||||
const Randomizer = (data, replacement) => {
|
||||
var mutable = [...data];
|
||||
if (replacement === undefined || typeof replacement !== "boolean") replacement = true;
|
||||
|
||||
function _next() {
|
||||
var selection;
|
||||
const index = Math.floor(Math.random() * mutable.length);
|
||||
|
||||
if (replacement) {
|
||||
selection = mutable.slice(index, index + 1)[0];
|
||||
} else {
|
||||
selection = mutable.splice(index, 1)[0];
|
||||
if (mutable.length === 0) mutable = [...data];
|
||||
}
|
||||
|
||||
return selection;
|
||||
}
|
||||
|
||||
return {
|
||||
next: _next
|
||||
};
|
||||
};
|
||||
|
||||
const Result = (type) => {
|
||||
var _result;
|
||||
|
||||
switch (type) {
|
||||
case "stats":
|
||||
_result = {
|
||||
size: 0,
|
||||
lines: 0,
|
||||
records: 0,
|
||||
errors: [],
|
||||
blanks: 0,
|
||||
created: undefined,
|
||||
modified: undefined,
|
||||
start: Date.now(),
|
||||
end: undefined,
|
||||
elapsed: 0
|
||||
};
|
||||
break;
|
||||
case "distribute":
|
||||
_result = {
|
||||
stores: undefined,
|
||||
records: 0,
|
||||
errors: [],
|
||||
start: Date.now(),
|
||||
end: undefined,
|
||||
elapsed: undefined
|
||||
};
|
||||
break;
|
||||
case "insert":
|
||||
_result = {
|
||||
inserted: 0,
|
||||
start: Date.now(),
|
||||
end: undefined,
|
||||
elapsed: 0
|
||||
};
|
||||
break;
|
||||
case "insertFile":
|
||||
_result = {
|
||||
lines: 0,
|
||||
inserted: 0,
|
||||
errors: [],
|
||||
blanks: 0,
|
||||
start: Date.now(),
|
||||
end: undefined
|
||||
};
|
||||
break;
|
||||
case "select":
|
||||
_result = {
|
||||
lines: 0,
|
||||
selected: 0,
|
||||
ignored: 0,
|
||||
errors: [],
|
||||
blanks: 0,
|
||||
start: Date.now(),
|
||||
end: undefined,
|
||||
elapsed: 0,
|
||||
data: [],
|
||||
};
|
||||
break;
|
||||
case "update":
|
||||
_result = {
|
||||
lines: 0,
|
||||
selected: 0,
|
||||
updated: 0,
|
||||
unchanged: 0,
|
||||
errors: [],
|
||||
blanks: 0,
|
||||
start: Date.now(),
|
||||
end: undefined,
|
||||
elapsed: 0,
|
||||
data: [],
|
||||
records: []
|
||||
};
|
||||
break;
|
||||
case "delete":
|
||||
_result = {
|
||||
lines: 0,
|
||||
deleted: 0,
|
||||
retained: 0,
|
||||
errors: [],
|
||||
blanks: 0,
|
||||
start: Date.now(),
|
||||
end: undefined,
|
||||
elapsed: 0,
|
||||
data: [],
|
||||
records: []
|
||||
};
|
||||
break;
|
||||
case "aggregate":
|
||||
_result = {
|
||||
lines: 0,
|
||||
aggregates: {},
|
||||
indexed: 0,
|
||||
unindexed: 0,
|
||||
errors: [],
|
||||
blanks: 0,
|
||||
start: Date.now(),
|
||||
end: undefined,
|
||||
elapsed: 0
|
||||
};
|
||||
break;
|
||||
}
|
||||
|
||||
return _result;
|
||||
}
|
||||
|
||||
const Reduce = (type) => {
|
||||
var _reduce;
|
||||
|
||||
switch (type) {
|
||||
case "stats":
|
||||
_reduce = Object.assign(Result("stats"), {
|
||||
stores: 0,
|
||||
min: undefined,
|
||||
max: undefined,
|
||||
mean: undefined,
|
||||
var: undefined,
|
||||
std: undefined,
|
||||
m2: 0
|
||||
});
|
||||
break;
|
||||
case "drop":
|
||||
_reduce = {
|
||||
dropped: false,
|
||||
start: Date.now(),
|
||||
end: 0,
|
||||
elapsed: 0
|
||||
};
|
||||
break;
|
||||
case "aggregate":
|
||||
_reduce = Object.assign(Result("aggregate"), {
|
||||
data: []
|
||||
});
|
||||
break;
|
||||
default:
|
||||
_reduce = Result(type);
|
||||
break;
|
||||
}
|
||||
|
||||
_reduce.details = undefined;
|
||||
|
||||
return _reduce;
|
||||
};
|
||||
|
||||
const Handler = (type, ...functions) => {
|
||||
var _results = Result(type);
|
||||
|
||||
const _next = (record, writer) => {
|
||||
record = new Record(record);
|
||||
_results.lines++;
|
||||
|
||||
if (record.length === 0) {
|
||||
_results.blanks++;
|
||||
} else {
|
||||
if (record.data) {
|
||||
switch (type) {
|
||||
case "stats":
|
||||
statsHandler(record, _results);
|
||||
break;
|
||||
case "select":
|
||||
selectHandler(record, functions[0], functions[1], _results);
|
||||
break;
|
||||
case "update":
|
||||
updateHandler(record, functions[0], functions[1], writer, _results);
|
||||
break;
|
||||
case "delete":
|
||||
deleteHandler(record, functions[0], writer, _results);
|
||||
break;
|
||||
case "aggregate":
|
||||
aggregateHandler(record, functions[0], functions[1], functions[2], _results);
|
||||
break;
|
||||
}
|
||||
} else {
|
||||
_results.errors.push({ error: record.error, line: _results.lines, data: record.source });
|
||||
|
||||
if (type === "update" || type === "delete") {
|
||||
if (writer) {
|
||||
writer.write(record.source + "\n");
|
||||
} else {
|
||||
_results.data.push(record.source);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
const _return = () => {
|
||||
_results.end = Date.now();
|
||||
_results.elapsed = _results.end - _results.start;
|
||||
return _results;
|
||||
}
|
||||
|
||||
return {
|
||||
next: _next,
|
||||
return: _return
|
||||
};
|
||||
};
|
||||
|
||||
const statsHandler = (record, results) => {
|
||||
results.records++;
|
||||
return results;
|
||||
};
|
||||
|
||||
const selectHandler = (record, selecter, projecter, results) => {
|
||||
if (record.select(selecter)) {
|
||||
if (projecter) {
|
||||
results.data.push(record.project(projecter));
|
||||
} else {
|
||||
results.data.push(record.data);
|
||||
}
|
||||
results.selected++;
|
||||
} else {
|
||||
results.ignored++;
|
||||
}
|
||||
};
|
||||
|
||||
const updateHandler = (record, selecter, updater, writer, results) => {
|
||||
if (record.select(selecter)) {
|
||||
results.selected++;
|
||||
if (record.update(updater)) {
|
||||
results.updated++;
|
||||
results.records.push(record.data);
|
||||
} else {
|
||||
results.unchanged++;
|
||||
}
|
||||
} else {
|
||||
results.unchanged++;
|
||||
}
|
||||
|
||||
if (writer) {
|
||||
writer.write(JSON.stringify(record.data) + "\n");
|
||||
} else {
|
||||
results.data.push(JSON.stringify(record.data));
|
||||
}
|
||||
};
|
||||
|
||||
const deleteHandler = (record, selecter, writer, results) => {
|
||||
if (record.select(selecter)) {
|
||||
results.deleted++;
|
||||
results.records.push(record.data);
|
||||
} else {
|
||||
results.retained++;
|
||||
|
||||
if (writer) {
|
||||
writer.write(JSON.stringify(record.data) + "\n");
|
||||
} else {
|
||||
results.data.push(JSON.stringify(record.data));
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
const aggregateHandler = (record, selecter, indexer, projecter, results) => {
|
||||
if (record.select(selecter)) {
|
||||
const index = record.index(indexer);
|
||||
|
||||
if (!index) {
|
||||
results.unindexed++;
|
||||
} else {
|
||||
var projection;
|
||||
var fields;
|
||||
|
||||
if (results.aggregates[index]) {
|
||||
results.aggregates[index].count++;
|
||||
} else {
|
||||
results.aggregates[index] = {
|
||||
count: 1,
|
||||
aggregates: {}
|
||||
};
|
||||
}
|
||||
|
||||
if (projecter) {
|
||||
projection = record.project(projecter);
|
||||
fields = Object.keys(projection);
|
||||
} else {
|
||||
projection = record.data;
|
||||
fields = Object.keys(record.data);
|
||||
}
|
||||
|
||||
for (const field of fields) {
|
||||
if (projection[field] !== undefined) {
|
||||
if (results.aggregates[index].aggregates[field]) {
|
||||
accumulateAggregate(results.aggregates[index].aggregates[field], projection[field]);
|
||||
} else {
|
||||
results.aggregates[index].aggregates[field] = {
|
||||
min: projection[field],
|
||||
max: projection[field],
|
||||
count: 1
|
||||
};
|
||||
if (typeof projection[field] === "number") {
|
||||
results.aggregates[index].aggregates[field]["sum"] = projection[field];
|
||||
results.aggregates[index].aggregates[field]["mean"] = projection[field];
|
||||
results.aggregates[index].aggregates[field]["m2"] = 0;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
results.indexed++;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
const accumulateAggregate = (index, projection) => {
|
||||
index["min"] = min(index["min"], projection);
|
||||
index["max"] = max(index["max"], projection);
|
||||
index["count"]++;
|
||||
|
||||
// Welford's algorithm
|
||||
if (typeof projection === "number") {
|
||||
const delta1 = projection - index["mean"];
|
||||
index["sum"] += projection;
|
||||
index["mean"] += delta1 / index["count"];
|
||||
const delta2 = projection - index["mean"];
|
||||
index["m2"] += delta1 * delta2;
|
||||
}
|
||||
|
||||
return index;
|
||||
};
|
||||
|
||||
class Record {
|
||||
constructor(record) {
|
||||
this.source = record.trim();
|
||||
this.length = this.source.length
|
||||
this.data = {};
|
||||
this.error = "";
|
||||
|
||||
try {
|
||||
this.data = JSON.parse(this.source)
|
||||
} catch (e) {
|
||||
this.data = undefined;
|
||||
this.error = e.message;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
Record.prototype.select = function (selecter) {
|
||||
var result;
|
||||
|
||||
try {
|
||||
result = selecter(this.data);
|
||||
} catch {
|
||||
return false;
|
||||
}
|
||||
|
||||
if (typeof result !== "boolean") {
|
||||
throw new TypeError("Selecter must return a boolean");
|
||||
} else {
|
||||
return result;
|
||||
}
|
||||
};
|
||||
|
||||
Record.prototype.update = function (updater) {
|
||||
var result;
|
||||
|
||||
try {
|
||||
result = updater(this.data);
|
||||
} catch {
|
||||
return false;
|
||||
}
|
||||
|
||||
if (typeof result !== "object") {
|
||||
throw new TypeError("Updater must return an object");
|
||||
} else {
|
||||
this.data = result;
|
||||
return true;
|
||||
}
|
||||
}
|
||||
|
||||
Record.prototype.project = function (projecter) {
|
||||
var result;
|
||||
|
||||
try {
|
||||
result = projecter(this.data);
|
||||
} catch {
|
||||
return undefined;
|
||||
}
|
||||
|
||||
if (Array.isArray(result) || typeof result !== "object") {
|
||||
throw new TypeError("Projecter must return an object");
|
||||
} else {
|
||||
return result;
|
||||
}
|
||||
};
|
||||
|
||||
Record.prototype.index = function (indexer) {
|
||||
try {
|
||||
return indexer(this.data);
|
||||
} catch {
|
||||
return undefined;
|
||||
}
|
||||
};
|
||||
|
||||
const Reducer = (type, results) => {
|
||||
var _reduce = Reduce(type);
|
||||
|
||||
var i = 0;
|
||||
var aggregates = {};
|
||||
|
||||
for (const result of results) {
|
||||
switch (type) {
|
||||
case "stats":
|
||||
statsReducer(_reduce, result, i);
|
||||
break;
|
||||
case "insert":
|
||||
insertReducer(_reduce, result);
|
||||
break;
|
||||
case "select":
|
||||
selectReducer(_reduce, result);
|
||||
break;
|
||||
case "update":
|
||||
updateReducer(_reduce, result);
|
||||
break;
|
||||
case "delete":
|
||||
deleteReducer(_reduce, result);
|
||||
break;
|
||||
case "aggregate":
|
||||
aggregateReducer(_reduce, result, aggregates);
|
||||
break
|
||||
}
|
||||
|
||||
if (type === "stats") {
|
||||
_reduce.stores++;
|
||||
i++;
|
||||
}
|
||||
|
||||
if (type === "drop") {
|
||||
_reduce.dropped = true;
|
||||
} else if (type !== "insert") {
|
||||
_reduce.lines += result.lines;
|
||||
_reduce.errors = _reduce.errors.concat(result.errors);
|
||||
_reduce.blanks += result.blanks;
|
||||
}
|
||||
|
||||
_reduce.start = min(_reduce.start, result.start);
|
||||
_reduce.end = max(_reduce.end, result.end);
|
||||
}
|
||||
|
||||
if (type === "stats") {
|
||||
_reduce.size = convertSize(_reduce.size);
|
||||
_reduce.var = _reduce.m2 / (results.length);
|
||||
_reduce.std = Math.sqrt(_reduce.m2 / (results.length));
|
||||
delete _reduce.m2;
|
||||
} else if (type === "aggregate") {
|
||||
for (const index of Object.keys(aggregates)) {
|
||||
var aggregate = {
|
||||
index: index,
|
||||
count: aggregates[index].count,
|
||||
aggregates: []
|
||||
};
|
||||
for (const field of Object.keys(aggregates[index].aggregates)) {
|
||||
delete aggregates[index].aggregates[field].m2;
|
||||
aggregate.aggregates.push({ field: field, data: aggregates[index].aggregates[field] });
|
||||
}
|
||||
_reduce.data.push(aggregate);
|
||||
}
|
||||
delete _reduce.aggregates;
|
||||
}
|
||||
|
||||
_reduce.elapsed = _reduce.end - _reduce.start;
|
||||
_reduce.details = results;
|
||||
|
||||
return _reduce;
|
||||
};
|
||||
|
||||
const statsReducer = (reduce, result, i) => {
|
||||
reduce.size += result.size;
|
||||
reduce.records += result.records;
|
||||
reduce.min = min(reduce.min, result.records);
|
||||
reduce.max = max(reduce.max, result.records);
|
||||
if (reduce.mean === undefined) reduce.mean = result.records;
|
||||
const delta1 = result.records - reduce.mean;
|
||||
reduce.mean += delta1 / (i + 2);
|
||||
const delta2 = result.records - reduce.mean;
|
||||
reduce.m2 += delta1 * delta2;
|
||||
reduce.created = min(reduce.created, result.created);
|
||||
reduce.modified = max(reduce.modified, result.modified);
|
||||
};
|
||||
|
||||
const insertReducer = (reduce, result) => {
|
||||
reduce.inserted += result.inserted;
|
||||
};
|
||||
|
||||
const selectReducer = (reduce, result) => {
|
||||
reduce.selected += result.selected;
|
||||
reduce.ignored += result.ignored;
|
||||
reduce.data = reduce.data.concat(result.data);
|
||||
delete result.data;
|
||||
};
|
||||
|
||||
const updateReducer = (reduce, result) => {
|
||||
reduce.selected += result.selected;
|
||||
reduce.updated += result.updated;
|
||||
reduce.unchanged += result.unchanged;
|
||||
};
|
||||
|
||||
const deleteReducer = (reduce, result) => {
|
||||
reduce.deleted += result.deleted;
|
||||
reduce.retained += result.retained;
|
||||
};
|
||||
|
||||
const aggregateReducer = (reduce, result, aggregates) => {
|
||||
reduce.indexed += result.indexed;
|
||||
reduce.unindexed += result.unindexed;
|
||||
|
||||
const indexes = Object.keys(result.aggregates);
|
||||
|
||||
for (const index of indexes) {
|
||||
if (aggregates[index]) {
|
||||
aggregates[index].count += result.aggregates[index].count;
|
||||
} else {
|
||||
aggregates[index] = {
|
||||
count: result.aggregates[index].count,
|
||||
aggregates: {}
|
||||
};
|
||||
}
|
||||
|
||||
const fields = Object.keys(result.aggregates[index].aggregates);
|
||||
|
||||
for (const field of fields) {
|
||||
const aggregateObject = aggregates[index].aggregates[field];
|
||||
const resultObject = result.aggregates[index].aggregates[field];
|
||||
|
||||
if (aggregateObject) {
|
||||
reduceAggregate(aggregateObject, resultObject);
|
||||
} else {
|
||||
aggregates[index].aggregates[field] = {
|
||||
min: resultObject["min"],
|
||||
max: resultObject["max"],
|
||||
count: resultObject["count"]
|
||||
};
|
||||
|
||||
if (resultObject["m2"] !== undefined) {
|
||||
aggregates[index].aggregates[field]["sum"] = resultObject["sum"];
|
||||
aggregates[index].aggregates[field]["mean"] = resultObject["mean"];
|
||||
aggregates[index].aggregates[field]["varp"] = resultObject["m2"] / resultObject["count"];
|
||||
aggregates[index].aggregates[field]["vars"] = resultObject["m2"] / (resultObject["count"] - 1);
|
||||
aggregates[index].aggregates[field]["stdp"] = Math.sqrt(resultObject["m2"] / resultObject["count"]);
|
||||
aggregates[index].aggregates[field]["stds"] = Math.sqrt(resultObject["m2"] / (resultObject["count"] - 1));
|
||||
aggregates[index].aggregates[field]["m2"] = resultObject["m2"];
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
delete result.aggregates;
|
||||
};
|
||||
|
||||
const reduceAggregate = (aggregate, result) => {
|
||||
const n = aggregate["count"] + result["count"];
|
||||
|
||||
aggregate["min"] = min(aggregate["min"], result["min"]);
|
||||
aggregate["max"] = max(aggregate["max"], result["max"]);
|
||||
|
||||
// Parallel version of Welford's algorithm
|
||||
if (result["m2"] !== undefined) {
|
||||
const delta = result["mean"] - aggregate["mean"];
|
||||
const m2 = aggregate["m2"] + result["m2"] + (Math.pow(delta, 2) * ((aggregate["count"] * result["count"]) / n));
|
||||
aggregate["m2"] = m2;
|
||||
aggregate["varp"] = m2 / n;
|
||||
aggregate["vars"] = m2 / (n - 1);
|
||||
aggregate["stdp"] = Math.sqrt(m2 / n);
|
||||
aggregate["stds"] = Math.sqrt(m2 / (n - 1));
|
||||
}
|
||||
|
||||
if (result["sum"] !== undefined) {
|
||||
aggregate["mean"] = (aggregate["sum"] + result["sum"]) / n;
|
||||
aggregate["sum"] += result["sum"];
|
||||
}
|
||||
|
||||
aggregate["count"] = n;
|
||||
};
|
||||
|
||||
exports.Randomizer = Randomizer;
|
||||
exports.Result = Result;
|
||||
exports.Reduce = Reduce;
|
||||
exports.Handler = Handler;
|
||||
exports.Reducer = Reducer;
|
@ -1,178 +0,0 @@
|
||||
"use strict";
|
||||
|
||||
const {
|
||||
access,
|
||||
constants,
|
||||
existsSync,
|
||||
rename,
|
||||
renameSync,
|
||||
rmdir,
|
||||
rmdirSync,
|
||||
unlink,
|
||||
unlinkSync
|
||||
} = require("graceful-fs");
|
||||
|
||||
const { promisify } = require("util");
|
||||
|
||||
const min = (a, b) => {
|
||||
if (b === undefined || a <= b) return a;
|
||||
return b;
|
||||
};
|
||||
|
||||
const max = (a, b) => {
|
||||
if (b === undefined || a > b) return a;
|
||||
return b;
|
||||
};
|
||||
|
||||
const convertSize = (size) => {
|
||||
const sizes = ["bytes", "KB", "MB", "GB"];
|
||||
|
||||
var index = Math.floor(Math.log2(size) / 10);
|
||||
if (index > 3) index = 3;
|
||||
|
||||
return Math.round(((size / Math.pow(1024, index)) + Number.EPSILON) * 100) / 100 + " " + sizes[index];
|
||||
};
|
||||
|
||||
const fileExists = async (a) => {
|
||||
try {
|
||||
await promisify(access)(a, constants.F_OK);
|
||||
return true;
|
||||
} catch (error) {
|
||||
// console.error(error); file does not exist no need for error
|
||||
return false;
|
||||
}
|
||||
}
|
||||
|
||||
const fileExistsSync = (a) => {
|
||||
try {
|
||||
return existsSync(a);
|
||||
} catch (error) {
|
||||
console.error(error);
|
||||
return false;
|
||||
}
|
||||
}
|
||||
|
||||
const moveFile = async (a, b) => {
|
||||
try {
|
||||
await promisify(rename)(a, b);
|
||||
return true;
|
||||
} catch (error) {
|
||||
console.error(error);
|
||||
return false;
|
||||
}
|
||||
};
|
||||
|
||||
const moveFileSync = (a, b) => {
|
||||
try {
|
||||
renameSync(a, b);
|
||||
return true;
|
||||
} catch (error) {
|
||||
console.error(error);
|
||||
return false;
|
||||
}
|
||||
};
|
||||
|
||||
const deleteFile = async (filepath) => {
|
||||
try {
|
||||
await promisify(unlink)(filepath);
|
||||
return true;
|
||||
} catch (error) {
|
||||
console.error(error);
|
||||
return false;
|
||||
}
|
||||
};
|
||||
|
||||
const deleteFileSync = (filepath) => {
|
||||
try {
|
||||
unlinkSync(filepath);
|
||||
return true;
|
||||
} catch (error) {
|
||||
console.error(error);
|
||||
return false;
|
||||
}
|
||||
}
|
||||
|
||||
const replaceFile = async (a, b) => {
|
||||
if (!await moveFile(a, a + ".old")) return false;
|
||||
|
||||
if (!await moveFile(b, a)) {
|
||||
await moveFile(a + ".old", a);
|
||||
return false;
|
||||
}
|
||||
|
||||
await deleteFile(a + ".old");
|
||||
|
||||
return true;
|
||||
};
|
||||
|
||||
const replaceFileSync = (a, b) => {
|
||||
if (!moveFileSync(a, a + ".old")) return false;
|
||||
|
||||
if (!moveFileSync(b, a)) {
|
||||
moveFile(a + ".old", a);
|
||||
return false;
|
||||
}
|
||||
|
||||
deleteFileSync(a + ".old");
|
||||
|
||||
return true;
|
||||
};
|
||||
|
||||
const deleteDirectory = async (dirpath) => {
|
||||
try {
|
||||
await promisify(rmdir)(dirpath);
|
||||
return true;
|
||||
} catch {
|
||||
return false;
|
||||
}
|
||||
};
|
||||
|
||||
const deleteDirectorySync = (dirpath) => {
|
||||
try {
|
||||
rmdirSync(dirpath);
|
||||
return true;
|
||||
} catch {
|
||||
return false;
|
||||
}
|
||||
};
|
||||
|
||||
const releaseLock = async (store, release) => {
|
||||
try {
|
||||
await release();
|
||||
} catch (error) {
|
||||
if (!["ERELEASED", "ENOTACQUIRED"].includes(error.code)) {
|
||||
error.store = store;
|
||||
throw error;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
const releaseLockSync = (store, release) => {
|
||||
try {
|
||||
release();
|
||||
} catch (error) {
|
||||
if (!["ERELEASED", "ENOTACQUIRED"].includes(error.code)) {
|
||||
error.store = store;
|
||||
throw error;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
exports.min = min;
|
||||
exports.max = max;
|
||||
|
||||
exports.convertSize = convertSize;
|
||||
|
||||
exports.fileExists = fileExists;
|
||||
exports.fileExistsSync = fileExistsSync;
|
||||
exports.moveFile = moveFile;
|
||||
exports.moveFileSync = moveFileSync;
|
||||
exports.replaceFile = replaceFile;
|
||||
exports.replaceFileSync = replaceFileSync;
|
||||
exports.deleteFile = deleteFile;
|
||||
exports.deleteFileSync = deleteFileSync;
|
||||
exports.deleteDirectory = deleteDirectory;
|
||||
exports.deleteDirectorySync = deleteDirectorySync;
|
||||
|
||||
exports.releaseLock = releaseLock;
|
||||
exports.releaseLockSync = releaseLockSync;
|
@ -1,70 +0,0 @@
|
||||
"use strict";
|
||||
|
||||
const { existsSync } = require("graceful-fs");
|
||||
|
||||
const validateSize = (s) => {
|
||||
if (typeof s !== "number") {
|
||||
throw new TypeError("Size must be a number");
|
||||
} else if (s <= 0) {
|
||||
throw new RangeError("Size must be greater than zero");
|
||||
}
|
||||
|
||||
return s;
|
||||
};
|
||||
|
||||
const validateName = (n) => {
|
||||
if (typeof n !== "string") {
|
||||
throw new TypeError("Name must be a string");
|
||||
} else if (n.trim().length <= 0) {
|
||||
throw new Error("Name must be a non-blank string")
|
||||
}
|
||||
|
||||
return n;
|
||||
};
|
||||
|
||||
const validatePath = (p) => {
|
||||
if (typeof p !== "string") {
|
||||
throw new TypeError("Path must be a string");
|
||||
} else if (p.trim().length <= 0) {
|
||||
throw new Error("Path must be a non-blank string");
|
||||
} else if (!existsSync(p)) {
|
||||
throw new Error("Path does not exist");
|
||||
}
|
||||
|
||||
return p;
|
||||
};
|
||||
|
||||
const validateArray = (a) => {
|
||||
if (!Array.isArray(a)) {
|
||||
throw new TypeError("Not an array");
|
||||
}
|
||||
|
||||
return a;
|
||||
};
|
||||
|
||||
const validateObject = (o) => {
|
||||
if (typeof o !== "object") {
|
||||
throw new TypeError("Not an object");
|
||||
}
|
||||
|
||||
return o;
|
||||
};
|
||||
|
||||
const validateFunction = (f) => {
|
||||
if (typeof f !== "function") {
|
||||
throw new TypeError("Not a function")
|
||||
}
|
||||
// } else {
|
||||
// const fString = f.toString();
|
||||
// if (/\s*function/.test(fString) && !/\W+return\W+/.test(fString)) throw new Error("Function must return a value");
|
||||
// }
|
||||
|
||||
return f;
|
||||
}
|
||||
|
||||
exports.validateSize = validateSize;
|
||||
exports.validateName = validateName;
|
||||
exports.validatePath = validatePath;
|
||||
exports.validateArray = validateArray;
|
||||
exports.validateObject = validateObject;
|
||||
exports.validateFunction = validateFunction;
|
@ -1,21 +0,0 @@
|
||||
The MIT License (MIT)
|
||||
|
||||
Copyright (c) 2018 Made With MOXY Lda <hello@moxy.studio>
|
||||
|
||||
Permission is hereby granted, free of charge, to any person obtaining a copy
|
||||
of this software and associated documentation files (the "Software"), to deal
|
||||
in the Software without restriction, including without limitation the rights
|
||||
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
|
||||
copies of the Software, and to permit persons to whom the Software is
|
||||
furnished to do so, subject to the following conditions:
|
||||
|
||||
The above copyright notice and this permission notice shall be included in
|
||||
all copies or substantial portions of the Software.
|
||||
|
||||
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
|
||||
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
|
||||
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
|
||||
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
|
||||
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
|
||||
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
|
||||
THE SOFTWARE.
|
@ -1,46 +0,0 @@
|
||||
'use strict';
|
||||
|
||||
//
|
||||
// used by njodb
|
||||
// Source: https://github.com/moxystudio/node-proper-lockfile
|
||||
//
|
||||
|
||||
|
||||
const lockfile = require('./lib/lockfile');
|
||||
const { toPromise, toSync, toSyncOptions } = require('./lib/adapter');
|
||||
|
||||
async function lock(file, options) {
|
||||
const release = await toPromise(lockfile.lock)(file, options);
|
||||
|
||||
return toPromise(release);
|
||||
}
|
||||
|
||||
function lockSync(file, options) {
|
||||
const release = toSync(lockfile.lock)(file, toSyncOptions(options));
|
||||
|
||||
return toSync(release);
|
||||
}
|
||||
|
||||
function unlock(file, options) {
|
||||
return toPromise(lockfile.unlock)(file, options);
|
||||
}
|
||||
|
||||
function unlockSync(file, options) {
|
||||
return toSync(lockfile.unlock)(file, toSyncOptions(options));
|
||||
}
|
||||
|
||||
function check(file, options) {
|
||||
return toPromise(lockfile.check)(file, options);
|
||||
}
|
||||
|
||||
function checkSync(file, options) {
|
||||
return toSync(lockfile.check)(file, toSyncOptions(options));
|
||||
}
|
||||
|
||||
module.exports = lock;
|
||||
module.exports.lock = lock;
|
||||
module.exports.unlock = unlock;
|
||||
module.exports.lockSync = lockSync;
|
||||
module.exports.unlockSync = unlockSync;
|
||||
module.exports.check = check;
|
||||
module.exports.checkSync = checkSync;
|
@ -1,85 +0,0 @@
|
||||
'use strict';
|
||||
|
||||
const fs = require('graceful-fs');
|
||||
|
||||
function createSyncFs(fs) {
|
||||
const methods = ['mkdir', 'realpath', 'stat', 'rmdir', 'utimes'];
|
||||
const newFs = { ...fs };
|
||||
|
||||
methods.forEach((method) => {
|
||||
newFs[method] = (...args) => {
|
||||
const callback = args.pop();
|
||||
let ret;
|
||||
|
||||
try {
|
||||
ret = fs[`${method}Sync`](...args);
|
||||
} catch (err) {
|
||||
return callback(err);
|
||||
}
|
||||
|
||||
callback(null, ret);
|
||||
};
|
||||
});
|
||||
|
||||
return newFs;
|
||||
}
|
||||
|
||||
// ----------------------------------------------------------
|
||||
|
||||
function toPromise(method) {
|
||||
return (...args) => new Promise((resolve, reject) => {
|
||||
args.push((err, result) => {
|
||||
if (err) {
|
||||
reject(err);
|
||||
} else {
|
||||
resolve(result);
|
||||
}
|
||||
});
|
||||
|
||||
method(...args);
|
||||
});
|
||||
}
|
||||
|
||||
function toSync(method) {
|
||||
return (...args) => {
|
||||
let err;
|
||||
let result;
|
||||
|
||||
args.push((_err, _result) => {
|
||||
err = _err;
|
||||
result = _result;
|
||||
});
|
||||
|
||||
method(...args);
|
||||
|
||||
if (err) {
|
||||
throw err;
|
||||
}
|
||||
|
||||
return result;
|
||||
};
|
||||
}
|
||||
|
||||
function toSyncOptions(options) {
|
||||
// Shallow clone options because we are oging to mutate them
|
||||
options = { ...options };
|
||||
|
||||
// Transform fs to use the sync methods instead
|
||||
options.fs = createSyncFs(options.fs || fs);
|
||||
|
||||
// Retries are not allowed because it requires the flow to be sync
|
||||
if (
|
||||
(typeof options.retries === 'number' && options.retries > 0) ||
|
||||
(options.retries && typeof options.retries.retries === 'number' && options.retries.retries > 0)
|
||||
) {
|
||||
throw Object.assign(new Error('Cannot use retries with the sync api'), { code: 'ESYNC' });
|
||||
}
|
||||
|
||||
return options;
|
||||
}
|
||||
|
||||
module.exports = {
|
||||
toPromise,
|
||||
toSync,
|
||||
toSyncOptions,
|
||||
};
|
@ -1,345 +0,0 @@
|
||||
'use strict';
|
||||
|
||||
const path = require('path');
|
||||
const fs = require('graceful-fs');
|
||||
const retry = require('../../retry');
|
||||
const onExit = require('../../signalExit');
|
||||
const mtimePrecision = require('./mtime-precision');
|
||||
|
||||
const locks = {};
|
||||
|
||||
function getLockFile(file, options) {
|
||||
return options.lockfilePath || `${file}.lock`;
|
||||
}
|
||||
|
||||
function resolveCanonicalPath(file, options, callback) {
|
||||
if (!options.realpath) {
|
||||
return callback(null, path.resolve(file));
|
||||
}
|
||||
|
||||
// Use realpath to resolve symlinks
|
||||
// It also resolves relative paths
|
||||
options.fs.realpath(file, callback);
|
||||
}
|
||||
|
||||
function acquireLock(file, options, callback) {
|
||||
const lockfilePath = getLockFile(file, options);
|
||||
|
||||
// Use mkdir to create the lockfile (atomic operation)
|
||||
options.fs.mkdir(lockfilePath, (err) => {
|
||||
if (!err) {
|
||||
// At this point, we acquired the lock!
|
||||
// Probe the mtime precision
|
||||
return mtimePrecision.probe(lockfilePath, options.fs, (err, mtime, mtimePrecision) => {
|
||||
// If it failed, try to remove the lock..
|
||||
/* istanbul ignore if */
|
||||
if (err) {
|
||||
options.fs.rmdir(lockfilePath, () => { });
|
||||
|
||||
return callback(err);
|
||||
}
|
||||
|
||||
callback(null, mtime, mtimePrecision);
|
||||
});
|
||||
}
|
||||
|
||||
// If error is not EEXIST then some other error occurred while locking
|
||||
if (err.code !== 'EEXIST') {
|
||||
return callback(err);
|
||||
}
|
||||
|
||||
// Otherwise, check if lock is stale by analyzing the file mtime
|
||||
if (options.stale <= 0) {
|
||||
return callback(Object.assign(new Error('Lock file is already being held'), { code: 'ELOCKED', file }));
|
||||
}
|
||||
|
||||
options.fs.stat(lockfilePath, (err, stat) => {
|
||||
if (err) {
|
||||
// Retry if the lockfile has been removed (meanwhile)
|
||||
// Skip stale check to avoid recursiveness
|
||||
if (err.code === 'ENOENT') {
|
||||
return acquireLock(file, { ...options, stale: 0 }, callback);
|
||||
}
|
||||
|
||||
return callback(err);
|
||||
}
|
||||
|
||||
if (!isLockStale(stat, options)) {
|
||||
return callback(Object.assign(new Error('Lock file is already being held'), { code: 'ELOCKED', file }));
|
||||
}
|
||||
|
||||
// If it's stale, remove it and try again!
|
||||
// Skip stale check to avoid recursiveness
|
||||
removeLock(file, options, (err) => {
|
||||
if (err) {
|
||||
return callback(err);
|
||||
}
|
||||
|
||||
acquireLock(file, { ...options, stale: 0 }, callback);
|
||||
});
|
||||
});
|
||||
});
|
||||
}
|
||||
|
||||
function isLockStale(stat, options) {
|
||||
return stat.mtime.getTime() < Date.now() - options.stale;
|
||||
}
|
||||
|
||||
function removeLock(file, options, callback) {
|
||||
// Remove lockfile, ignoring ENOENT errors
|
||||
options.fs.rmdir(getLockFile(file, options), (err) => {
|
||||
if (err && err.code !== 'ENOENT') {
|
||||
return callback(err);
|
||||
}
|
||||
|
||||
callback();
|
||||
});
|
||||
}
|
||||
|
||||
function updateLock(file, options) {
|
||||
const lock = locks[file];
|
||||
|
||||
// Just for safety, should never happen
|
||||
/* istanbul ignore if */
|
||||
if (lock.updateTimeout) {
|
||||
return;
|
||||
}
|
||||
|
||||
lock.updateDelay = lock.updateDelay || options.update;
|
||||
lock.updateTimeout = setTimeout(() => {
|
||||
lock.updateTimeout = null;
|
||||
|
||||
// Stat the file to check if mtime is still ours
|
||||
// If it is, we can still recover from a system sleep or a busy event loop
|
||||
options.fs.stat(lock.lockfilePath, (err, stat) => {
|
||||
const isOverThreshold = lock.lastUpdate + options.stale < Date.now();
|
||||
|
||||
// If it failed to update the lockfile, keep trying unless
|
||||
// the lockfile was deleted or we are over the threshold
|
||||
if (err) {
|
||||
if (err.code === 'ENOENT' || isOverThreshold) {
|
||||
console.error(`lockfile "${file}" compromised. stat code=${err.code}, isOverThreshold=${isOverThreshold}`)
|
||||
return setLockAsCompromised(file, lock, Object.assign(err, { code: 'ECOMPROMISED' }));
|
||||
}
|
||||
|
||||
lock.updateDelay = 1000;
|
||||
|
||||
return updateLock(file, options);
|
||||
}
|
||||
|
||||
const isMtimeOurs = lock.mtime.getTime() === stat.mtime.getTime();
|
||||
|
||||
if (!isMtimeOurs) {
|
||||
console.error(`lockfile "${file}" compromised. mtime is not ours`)
|
||||
return setLockAsCompromised(
|
||||
file,
|
||||
lock,
|
||||
Object.assign(
|
||||
new Error('Unable to update lock within the stale threshold'),
|
||||
{ code: 'ECOMPROMISED' }
|
||||
));
|
||||
}
|
||||
|
||||
const mtime = mtimePrecision.getMtime(lock.mtimePrecision);
|
||||
|
||||
options.fs.utimes(lock.lockfilePath, mtime, mtime, (err) => {
|
||||
const isOverThreshold = lock.lastUpdate + options.stale < Date.now();
|
||||
|
||||
// Ignore if the lock was released
|
||||
if (lock.released) {
|
||||
return;
|
||||
}
|
||||
|
||||
// If it failed to update the lockfile, keep trying unless
|
||||
// the lockfile was deleted or we are over the threshold
|
||||
if (err) {
|
||||
if (err.code === 'ENOENT' || isOverThreshold) {
|
||||
console.error(`lockfile "${file}" compromised. utimes code=${err.code}, isOverThreshold=${isOverThreshold}`)
|
||||
return setLockAsCompromised(file, lock, Object.assign(err, { code: 'ECOMPROMISED' }));
|
||||
}
|
||||
|
||||
lock.updateDelay = 1000;
|
||||
|
||||
return updateLock(file, options);
|
||||
}
|
||||
|
||||
// All ok, keep updating..
|
||||
lock.mtime = mtime;
|
||||
lock.lastUpdate = Date.now();
|
||||
lock.updateDelay = null;
|
||||
updateLock(file, options);
|
||||
});
|
||||
});
|
||||
}, lock.updateDelay);
|
||||
|
||||
// Unref the timer so that the nodejs process can exit freely
|
||||
// This is safe because all acquired locks will be automatically released
|
||||
// on process exit
|
||||
|
||||
// We first check that `lock.updateTimeout.unref` exists because some users
|
||||
// may be using this module outside of NodeJS (e.g., in an electron app),
|
||||
// and in those cases `setTimeout` return an integer.
|
||||
/* istanbul ignore else */
|
||||
if (lock.updateTimeout.unref) {
|
||||
lock.updateTimeout.unref();
|
||||
}
|
||||
}
|
||||
|
||||
function setLockAsCompromised(file, lock, err) {
|
||||
// Signal the lock has been released
|
||||
lock.released = true;
|
||||
|
||||
// Cancel lock mtime update
|
||||
// Just for safety, at this point updateTimeout should be null
|
||||
/* istanbul ignore if */
|
||||
if (lock.updateTimeout) {
|
||||
clearTimeout(lock.updateTimeout);
|
||||
}
|
||||
|
||||
if (locks[file] === lock) {
|
||||
delete locks[file];
|
||||
}
|
||||
|
||||
lock.options.onCompromised(err);
|
||||
}
|
||||
|
||||
// ----------------------------------------------------------
|
||||
|
||||
function lock(file, options, callback) {
|
||||
/* istanbul ignore next */
|
||||
options = {
|
||||
stale: 10000,
|
||||
update: null,
|
||||
realpath: true,
|
||||
retries: 0,
|
||||
fs,
|
||||
onCompromised: (err) => { throw err; },
|
||||
...options,
|
||||
};
|
||||
|
||||
options.retries = options.retries || 0;
|
||||
options.retries = typeof options.retries === 'number' ? { retries: options.retries } : options.retries;
|
||||
options.stale = Math.max(options.stale || 0, 2000);
|
||||
options.update = options.update == null ? options.stale / 2 : options.update || 0;
|
||||
options.update = Math.max(Math.min(options.update, options.stale / 2), 1000);
|
||||
|
||||
// Resolve to a canonical file path
|
||||
resolveCanonicalPath(file, options, (err, file) => {
|
||||
if (err) {
|
||||
return callback(err);
|
||||
}
|
||||
|
||||
// Attempt to acquire the lock
|
||||
const operation = retry.operation(options.retries);
|
||||
|
||||
operation.attempt(() => {
|
||||
acquireLock(file, options, (err, mtime, mtimePrecision) => {
|
||||
if (operation.retry(err)) {
|
||||
return;
|
||||
}
|
||||
|
||||
if (err) {
|
||||
return callback(operation.mainError());
|
||||
}
|
||||
|
||||
// We now own the lock
|
||||
const lock = locks[file] = {
|
||||
lockfilePath: getLockFile(file, options),
|
||||
mtime,
|
||||
mtimePrecision,
|
||||
options,
|
||||
lastUpdate: Date.now(),
|
||||
};
|
||||
|
||||
// We must keep the lock fresh to avoid staleness
|
||||
updateLock(file, options);
|
||||
|
||||
callback(null, (releasedCallback) => {
|
||||
if (lock.released) {
|
||||
return releasedCallback &&
|
||||
releasedCallback(Object.assign(new Error('Lock is already released'), { code: 'ERELEASED' }));
|
||||
}
|
||||
|
||||
// Not necessary to use realpath twice when unlocking
|
||||
unlock(file, { ...options, realpath: false }, releasedCallback);
|
||||
});
|
||||
});
|
||||
});
|
||||
});
|
||||
}
|
||||
|
||||
function unlock(file, options, callback) {
|
||||
options = {
|
||||
fs,
|
||||
realpath: true,
|
||||
...options,
|
||||
};
|
||||
|
||||
// Resolve to a canonical file path
|
||||
resolveCanonicalPath(file, options, (err, file) => {
|
||||
if (err) {
|
||||
return callback(err);
|
||||
}
|
||||
|
||||
// Skip if the lock is not acquired
|
||||
const lock = locks[file];
|
||||
|
||||
if (!lock) {
|
||||
return callback(Object.assign(new Error('Lock is not acquired/owned by you'), { code: 'ENOTACQUIRED' }));
|
||||
}
|
||||
|
||||
lock.updateTimeout && clearTimeout(lock.updateTimeout); // Cancel lock mtime update
|
||||
lock.released = true; // Signal the lock has been released
|
||||
delete locks[file]; // Delete from locks
|
||||
|
||||
removeLock(file, options, callback);
|
||||
});
|
||||
}
|
||||
|
||||
function check(file, options, callback) {
|
||||
options = {
|
||||
stale: 10000,
|
||||
realpath: true,
|
||||
fs,
|
||||
...options,
|
||||
};
|
||||
|
||||
options.stale = Math.max(options.stale || 0, 2000);
|
||||
|
||||
// Resolve to a canonical file path
|
||||
resolveCanonicalPath(file, options, (err, file) => {
|
||||
if (err) {
|
||||
return callback(err);
|
||||
}
|
||||
|
||||
// Check if lockfile exists
|
||||
options.fs.stat(getLockFile(file, options), (err, stat) => {
|
||||
if (err) {
|
||||
// If does not exist, file is not locked. Otherwise, callback with error
|
||||
return err.code === 'ENOENT' ? callback(null, false) : callback(err);
|
||||
}
|
||||
|
||||
// Otherwise, check if lock is stale by analyzing the file mtime
|
||||
return callback(null, !isLockStale(stat, options));
|
||||
});
|
||||
});
|
||||
}
|
||||
|
||||
function getLocks() {
|
||||
return locks;
|
||||
}
|
||||
|
||||
// Remove acquired locks on exit
|
||||
/* istanbul ignore next */
|
||||
onExit(() => {
|
||||
for (const file in locks) {
|
||||
const options = locks[file].options;
|
||||
|
||||
try { options.fs.rmdirSync(getLockFile(file, options)); } catch (e) { /* Empty */ }
|
||||
}
|
||||
});
|
||||
|
||||
module.exports.lock = lock;
|
||||
module.exports.unlock = unlock;
|
||||
module.exports.check = check;
|
||||
module.exports.getLocks = getLocks;
|
@ -1,55 +0,0 @@
|
||||
'use strict';
|
||||
|
||||
const cacheSymbol = Symbol();
|
||||
|
||||
function probe(file, fs, callback) {
|
||||
const cachedPrecision = fs[cacheSymbol];
|
||||
|
||||
if (cachedPrecision) {
|
||||
return fs.stat(file, (err, stat) => {
|
||||
/* istanbul ignore if */
|
||||
if (err) {
|
||||
return callback(err);
|
||||
}
|
||||
|
||||
callback(null, stat.mtime, cachedPrecision);
|
||||
});
|
||||
}
|
||||
|
||||
// Set mtime by ceiling Date.now() to seconds + 5ms so that it's "not on the second"
|
||||
const mtime = new Date((Math.ceil(Date.now() / 1000) * 1000) + 5);
|
||||
|
||||
fs.utimes(file, mtime, mtime, (err) => {
|
||||
/* istanbul ignore if */
|
||||
if (err) {
|
||||
return callback(err);
|
||||
}
|
||||
|
||||
fs.stat(file, (err, stat) => {
|
||||
/* istanbul ignore if */
|
||||
if (err) {
|
||||
return callback(err);
|
||||
}
|
||||
|
||||
const precision = stat.mtime.getTime() % 1000 === 0 ? 's' : 'ms';
|
||||
|
||||
// Cache the precision in a non-enumerable way
|
||||
Object.defineProperty(fs, cacheSymbol, { value: precision });
|
||||
|
||||
callback(null, stat.mtime, precision);
|
||||
});
|
||||
});
|
||||
}
|
||||
|
||||
function getMtime(precision) {
|
||||
let now = Date.now();
|
||||
|
||||
if (precision === 's') {
|
||||
now = Math.ceil(now / 1000) * 1000;
|
||||
}
|
||||
|
||||
return new Date(now);
|
||||
}
|
||||
|
||||
module.exports.probe = probe;
|
||||
module.exports.getMtime = getMtime;
|
@ -1,21 +0,0 @@
|
||||
Copyright (c) 2011:
|
||||
Tim Koschützki (tim@debuggable.com)
|
||||
Felix Geisendörfer (felix@debuggable.com)
|
||||
|
||||
Permission is hereby granted, free of charge, to any person obtaining a copy
|
||||
of this software and associated documentation files (the "Software"), to deal
|
||||
in the Software without restriction, including without limitation the rights
|
||||
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
|
||||
copies of the Software, and to permit persons to whom the Software is
|
||||
furnished to do so, subject to the following conditions:
|
||||
|
||||
The above copyright notice and this permission notice shall be included in
|
||||
all copies or substantial portions of the Software.
|
||||
|
||||
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
|
||||
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
|
||||
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
|
||||
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
|
||||
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
|
||||
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
|
||||
THE SOFTWARE.
|
@ -1,105 +0,0 @@
|
||||
//
|
||||
// used by properLockFile
|
||||
// Source: https://github.com/tim-kos/node-retry
|
||||
//
|
||||
|
||||
var RetryOperation = require('./retry_operation');
|
||||
|
||||
exports.operation = function (options) {
|
||||
var timeouts = exports.timeouts(options);
|
||||
return new RetryOperation(timeouts, {
|
||||
forever: options && options.forever,
|
||||
unref: options && options.unref,
|
||||
maxRetryTime: options && options.maxRetryTime
|
||||
});
|
||||
};
|
||||
|
||||
exports.timeouts = function (options) {
|
||||
if (options instanceof Array) {
|
||||
return [].concat(options);
|
||||
}
|
||||
|
||||
var opts = {
|
||||
retries: 10,
|
||||
factor: 2,
|
||||
minTimeout: 1 * 1000,
|
||||
maxTimeout: Infinity,
|
||||
randomize: false
|
||||
};
|
||||
for (var key in options) {
|
||||
opts[key] = options[key];
|
||||
}
|
||||
|
||||
if (opts.minTimeout > opts.maxTimeout) {
|
||||
throw new Error('minTimeout is greater than maxTimeout');
|
||||
}
|
||||
|
||||
var timeouts = [];
|
||||
for (var i = 0; i < opts.retries; i++) {
|
||||
timeouts.push(this.createTimeout(i, opts));
|
||||
}
|
||||
|
||||
if (options && options.forever && !timeouts.length) {
|
||||
timeouts.push(this.createTimeout(i, opts));
|
||||
}
|
||||
|
||||
// sort the array numerically ascending
|
||||
timeouts.sort(function (a, b) {
|
||||
return a - b;
|
||||
});
|
||||
|
||||
return timeouts;
|
||||
};
|
||||
|
||||
exports.createTimeout = function (attempt, opts) {
|
||||
var random = (opts.randomize)
|
||||
? (Math.random() + 1)
|
||||
: 1;
|
||||
|
||||
var timeout = Math.round(random * opts.minTimeout * Math.pow(opts.factor, attempt));
|
||||
timeout = Math.min(timeout, opts.maxTimeout);
|
||||
|
||||
return timeout;
|
||||
};
|
||||
|
||||
exports.wrap = function (obj, options, methods) {
|
||||
if (options instanceof Array) {
|
||||
methods = options;
|
||||
options = null;
|
||||
}
|
||||
|
||||
if (!methods) {
|
||||
methods = [];
|
||||
for (var key in obj) {
|
||||
if (typeof obj[key] === 'function') {
|
||||
methods.push(key);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
for (var i = 0; i < methods.length; i++) {
|
||||
var method = methods[i];
|
||||
var original = obj[method];
|
||||
|
||||
obj[method] = function retryWrapper(original) {
|
||||
var op = exports.operation(options);
|
||||
var args = Array.prototype.slice.call(arguments, 1);
|
||||
var callback = args.pop();
|
||||
|
||||
args.push(function (err) {
|
||||
if (op.retry(err)) {
|
||||
return;
|
||||
}
|
||||
if (err) {
|
||||
arguments[0] = op.mainError();
|
||||
}
|
||||
callback.apply(this, arguments);
|
||||
});
|
||||
|
||||
op.attempt(function () {
|
||||
original.apply(obj, args);
|
||||
});
|
||||
}.bind(obj, original);
|
||||
obj[method].options = options;
|
||||
}
|
||||
};
|
@ -1,158 +0,0 @@
|
||||
function RetryOperation(timeouts, options) {
|
||||
// Compatibility for the old (timeouts, retryForever) signature
|
||||
if (typeof options === 'boolean') {
|
||||
options = { forever: options };
|
||||
}
|
||||
|
||||
this._originalTimeouts = JSON.parse(JSON.stringify(timeouts));
|
||||
this._timeouts = timeouts;
|
||||
this._options = options || {};
|
||||
this._maxRetryTime = options && options.maxRetryTime || Infinity;
|
||||
this._fn = null;
|
||||
this._errors = [];
|
||||
this._attempts = 1;
|
||||
this._operationTimeout = null;
|
||||
this._operationTimeoutCb = null;
|
||||
this._timeout = null;
|
||||
this._operationStart = null;
|
||||
|
||||
if (this._options.forever) {
|
||||
this._cachedTimeouts = this._timeouts.slice(0);
|
||||
}
|
||||
}
|
||||
module.exports = RetryOperation;
|
||||
|
||||
RetryOperation.prototype.reset = function() {
|
||||
this._attempts = 1;
|
||||
this._timeouts = this._originalTimeouts;
|
||||
}
|
||||
|
||||
RetryOperation.prototype.stop = function() {
|
||||
if (this._timeout) {
|
||||
clearTimeout(this._timeout);
|
||||
}
|
||||
|
||||
this._timeouts = [];
|
||||
this._cachedTimeouts = null;
|
||||
};
|
||||
|
||||
RetryOperation.prototype.retry = function(err) {
|
||||
if (this._timeout) {
|
||||
clearTimeout(this._timeout);
|
||||
}
|
||||
|
||||
if (!err) {
|
||||
return false;
|
||||
}
|
||||
var currentTime = new Date().getTime();
|
||||
if (err && currentTime - this._operationStart >= this._maxRetryTime) {
|
||||
this._errors.unshift(new Error('RetryOperation timeout occurred'));
|
||||
return false;
|
||||
}
|
||||
|
||||
this._errors.push(err);
|
||||
|
||||
var timeout = this._timeouts.shift();
|
||||
if (timeout === undefined) {
|
||||
if (this._cachedTimeouts) {
|
||||
// retry forever, only keep last error
|
||||
this._errors.splice(this._errors.length - 1, this._errors.length);
|
||||
this._timeouts = this._cachedTimeouts.slice(0);
|
||||
timeout = this._timeouts.shift();
|
||||
} else {
|
||||
return false;
|
||||
}
|
||||
}
|
||||
|
||||
var self = this;
|
||||
var timer = setTimeout(function() {
|
||||
self._attempts++;
|
||||
|
||||
if (self._operationTimeoutCb) {
|
||||
self._timeout = setTimeout(function() {
|
||||
self._operationTimeoutCb(self._attempts);
|
||||
}, self._operationTimeout);
|
||||
|
||||
if (self._options.unref) {
|
||||
self._timeout.unref();
|
||||
}
|
||||
}
|
||||
|
||||
self._fn(self._attempts);
|
||||
}, timeout);
|
||||
|
||||
if (this._options.unref) {
|
||||
timer.unref();
|
||||
}
|
||||
|
||||
return true;
|
||||
};
|
||||
|
||||
RetryOperation.prototype.attempt = function(fn, timeoutOps) {
|
||||
this._fn = fn;
|
||||
|
||||
if (timeoutOps) {
|
||||
if (timeoutOps.timeout) {
|
||||
this._operationTimeout = timeoutOps.timeout;
|
||||
}
|
||||
if (timeoutOps.cb) {
|
||||
this._operationTimeoutCb = timeoutOps.cb;
|
||||
}
|
||||
}
|
||||
|
||||
var self = this;
|
||||
if (this._operationTimeoutCb) {
|
||||
this._timeout = setTimeout(function() {
|
||||
self._operationTimeoutCb();
|
||||
}, self._operationTimeout);
|
||||
}
|
||||
|
||||
this._operationStart = new Date().getTime();
|
||||
|
||||
this._fn(this._attempts);
|
||||
};
|
||||
|
||||
RetryOperation.prototype.try = function(fn) {
|
||||
console.log('Using RetryOperation.try() is deprecated');
|
||||
this.attempt(fn);
|
||||
};
|
||||
|
||||
RetryOperation.prototype.start = function(fn) {
|
||||
console.log('Using RetryOperation.start() is deprecated');
|
||||
this.attempt(fn);
|
||||
};
|
||||
|
||||
RetryOperation.prototype.start = RetryOperation.prototype.try;
|
||||
|
||||
RetryOperation.prototype.errors = function() {
|
||||
return this._errors;
|
||||
};
|
||||
|
||||
RetryOperation.prototype.attempts = function() {
|
||||
return this._attempts;
|
||||
};
|
||||
|
||||
RetryOperation.prototype.mainError = function() {
|
||||
if (this._errors.length === 0) {
|
||||
return null;
|
||||
}
|
||||
|
||||
var counts = {};
|
||||
var mainError = null;
|
||||
var mainErrorCount = 0;
|
||||
|
||||
for (var i = 0; i < this._errors.length; i++) {
|
||||
var error = this._errors[i];
|
||||
var message = error.message;
|
||||
var count = (counts[message] || 0) + 1;
|
||||
|
||||
counts[message] = count;
|
||||
|
||||
if (count >= mainErrorCount) {
|
||||
mainError = error;
|
||||
mainErrorCount = count;
|
||||
}
|
||||
}
|
||||
|
||||
return mainError;
|
||||
};
|
@ -1,16 +0,0 @@
|
||||
The ISC License
|
||||
|
||||
Copyright (c) 2015-2022 Benjamin Coe, Isaac Z. Schlueter, and Contributors
|
||||
|
||||
Permission to use, copy, modify, and/or distribute this software
|
||||
for any purpose with or without fee is hereby granted, provided
|
||||
that the above copyright notice and this permission notice
|
||||
appear in all copies.
|
||||
|
||||
THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
|
||||
WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES
|
||||
OF MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE
|
||||
LIABLE FOR ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES
|
||||
OR ANY DAMAGES WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS,
|
||||
WHETHER IN AN ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION,
|
||||
ARISING OUT OF OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
|
@ -1,207 +0,0 @@
|
||||
//
|
||||
// used by properLockFile
|
||||
// Source: https://github.com/tapjs/signal-exit
|
||||
//
|
||||
|
||||
// Note: since nyc uses this module to output coverage, any lines
|
||||
// that are in the direct sync flow of nyc's outputCoverage are
|
||||
// ignored, since we can never get coverage for them.
|
||||
// grab a reference to node's real process object right away
|
||||
var process = global.process
|
||||
|
||||
const processOk = function (process) {
|
||||
return process &&
|
||||
typeof process === 'object' &&
|
||||
typeof process.removeListener === 'function' &&
|
||||
typeof process.emit === 'function' &&
|
||||
typeof process.reallyExit === 'function' &&
|
||||
typeof process.listeners === 'function' &&
|
||||
typeof process.kill === 'function' &&
|
||||
typeof process.pid === 'number' &&
|
||||
typeof process.on === 'function'
|
||||
}
|
||||
|
||||
// some kind of non-node environment, just no-op
|
||||
/* istanbul ignore if */
|
||||
if (!processOk(process)) {
|
||||
module.exports = function () {
|
||||
return function () { }
|
||||
}
|
||||
} else {
|
||||
var assert = require('assert')
|
||||
var signals = require('./signals.js')
|
||||
var isWin = /^win/i.test(process.platform)
|
||||
|
||||
var EE = require('events')
|
||||
/* istanbul ignore if */
|
||||
if (typeof EE !== 'function') {
|
||||
EE = EE.EventEmitter
|
||||
}
|
||||
|
||||
var emitter
|
||||
if (process.__signal_exit_emitter__) {
|
||||
emitter = process.__signal_exit_emitter__
|
||||
} else {
|
||||
emitter = process.__signal_exit_emitter__ = new EE()
|
||||
emitter.count = 0
|
||||
emitter.emitted = {}
|
||||
}
|
||||
|
||||
// Because this emitter is a global, we have to check to see if a
|
||||
// previous version of this library failed to enable infinite listeners.
|
||||
// I know what you're about to say. But literally everything about
|
||||
// signal-exit is a compromise with evil. Get used to it.
|
||||
if (!emitter.infinite) {
|
||||
emitter.setMaxListeners(Infinity)
|
||||
emitter.infinite = true
|
||||
}
|
||||
|
||||
module.exports = function (cb, opts) {
|
||||
/* istanbul ignore if */
|
||||
if (!processOk(global.process)) {
|
||||
return function () { }
|
||||
}
|
||||
assert.equal(typeof cb, 'function', 'a callback must be provided for exit handler')
|
||||
|
||||
if (loaded === false) {
|
||||
load()
|
||||
}
|
||||
|
||||
var ev = 'exit'
|
||||
if (opts && opts.alwaysLast) {
|
||||
ev = 'afterexit'
|
||||
}
|
||||
|
||||
var remove = function () {
|
||||
emitter.removeListener(ev, cb)
|
||||
if (emitter.listeners('exit').length === 0 &&
|
||||
emitter.listeners('afterexit').length === 0) {
|
||||
unload()
|
||||
}
|
||||
}
|
||||
emitter.on(ev, cb)
|
||||
|
||||
return remove
|
||||
}
|
||||
|
||||
var unload = function unload() {
|
||||
if (!loaded || !processOk(global.process)) {
|
||||
return
|
||||
}
|
||||
loaded = false
|
||||
|
||||
signals.forEach(function (sig) {
|
||||
try {
|
||||
process.removeListener(sig, sigListeners[sig])
|
||||
} catch (er) { }
|
||||
})
|
||||
process.emit = originalProcessEmit
|
||||
process.reallyExit = originalProcessReallyExit
|
||||
emitter.count -= 1
|
||||
}
|
||||
module.exports.unload = unload
|
||||
|
||||
var emit = function emit(event, code, signal) {
|
||||
/* istanbul ignore if */
|
||||
if (emitter.emitted[event]) {
|
||||
return
|
||||
}
|
||||
emitter.emitted[event] = true
|
||||
emitter.emit(event, code, signal)
|
||||
}
|
||||
|
||||
// { <signal>: <listener fn>, ... }
|
||||
var sigListeners = {}
|
||||
signals.forEach(function (sig) {
|
||||
sigListeners[sig] = function listener() {
|
||||
/* istanbul ignore if */
|
||||
if (!processOk(global.process)) {
|
||||
return
|
||||
}
|
||||
// If there are no other listeners, an exit is coming!
|
||||
// Simplest way: remove us and then re-send the signal.
|
||||
// We know that this will kill the process, so we can
|
||||
// safely emit now.
|
||||
var listeners = process.listeners(sig)
|
||||
if (listeners.length === emitter.count) {
|
||||
unload()
|
||||
emit('exit', null, sig)
|
||||
/* istanbul ignore next */
|
||||
emit('afterexit', null, sig)
|
||||
/* istanbul ignore next */
|
||||
if (isWin && sig === 'SIGHUP') {
|
||||
// "SIGHUP" throws an `ENOSYS` error on Windows,
|
||||
// so use a supported signal instead
|
||||
sig = 'SIGINT'
|
||||
}
|
||||
/* istanbul ignore next */
|
||||
process.kill(process.pid, sig)
|
||||
}
|
||||
}
|
||||
})
|
||||
|
||||
module.exports.signals = function () {
|
||||
return signals
|
||||
}
|
||||
|
||||
var loaded = false
|
||||
|
||||
var load = function load() {
|
||||
if (loaded || !processOk(global.process)) {
|
||||
return
|
||||
}
|
||||
loaded = true
|
||||
|
||||
// This is the number of onSignalExit's that are in play.
|
||||
// It's important so that we can count the correct number of
|
||||
// listeners on signals, and don't wait for the other one to
|
||||
// handle it instead of us.
|
||||
emitter.count += 1
|
||||
|
||||
signals = signals.filter(function (sig) {
|
||||
try {
|
||||
process.on(sig, sigListeners[sig])
|
||||
return true
|
||||
} catch (er) {
|
||||
return false
|
||||
}
|
||||
})
|
||||
|
||||
process.emit = processEmit
|
||||
process.reallyExit = processReallyExit
|
||||
}
|
||||
module.exports.load = load
|
||||
|
||||
var originalProcessReallyExit = process.reallyExit
|
||||
var processReallyExit = function processReallyExit(code) {
|
||||
/* istanbul ignore if */
|
||||
if (!processOk(global.process)) {
|
||||
return
|
||||
}
|
||||
process.exitCode = code || /* istanbul ignore next */ 0
|
||||
emit('exit', process.exitCode, null)
|
||||
/* istanbul ignore next */
|
||||
emit('afterexit', process.exitCode, null)
|
||||
/* istanbul ignore next */
|
||||
originalProcessReallyExit.call(process, process.exitCode)
|
||||
}
|
||||
|
||||
var originalProcessEmit = process.emit
|
||||
var processEmit = function processEmit(ev, arg) {
|
||||
if (ev === 'exit' && processOk(global.process)) {
|
||||
/* istanbul ignore else */
|
||||
if (arg !== undefined) {
|
||||
process.exitCode = arg
|
||||
}
|
||||
var ret = originalProcessEmit.apply(this, arguments)
|
||||
/* istanbul ignore next */
|
||||
emit('exit', process.exitCode, null)
|
||||
/* istanbul ignore next */
|
||||
emit('afterexit', process.exitCode, null)
|
||||
/* istanbul ignore next */
|
||||
return ret
|
||||
} else {
|
||||
return originalProcessEmit.apply(this, arguments)
|
||||
}
|
||||
}
|
||||
}
|
@ -1,53 +0,0 @@
|
||||
// This is not the set of all possible signals.
|
||||
//
|
||||
// It IS, however, the set of all signals that trigger
|
||||
// an exit on either Linux or BSD systems. Linux is a
|
||||
// superset of the signal names supported on BSD, and
|
||||
// the unknown signals just fail to register, so we can
|
||||
// catch that easily enough.
|
||||
//
|
||||
// Don't bother with SIGKILL. It's uncatchable, which
|
||||
// means that we can't fire any callbacks anyway.
|
||||
//
|
||||
// If a user does happen to register a handler on a non-
|
||||
// fatal signal like SIGWINCH or something, and then
|
||||
// exit, it'll end up firing `process.emit('exit')`, so
|
||||
// the handler will be fired anyway.
|
||||
//
|
||||
// SIGBUS, SIGFPE, SIGSEGV and SIGILL, when not raised
|
||||
// artificially, inherently leave the process in a
|
||||
// state from which it is not safe to try and enter JS
|
||||
// listeners.
|
||||
module.exports = [
|
||||
'SIGABRT',
|
||||
'SIGALRM',
|
||||
'SIGHUP',
|
||||
'SIGINT',
|
||||
'SIGTERM'
|
||||
]
|
||||
|
||||
if (process.platform !== 'win32') {
|
||||
module.exports.push(
|
||||
'SIGVTALRM',
|
||||
'SIGXCPU',
|
||||
'SIGXFSZ',
|
||||
'SIGUSR2',
|
||||
'SIGTRAP',
|
||||
'SIGSYS',
|
||||
'SIGQUIT',
|
||||
'SIGIOT'
|
||||
// should detect profiler and enable/disable accordingly.
|
||||
// see #21
|
||||
// 'SIGPROF'
|
||||
)
|
||||
}
|
||||
|
||||
if (process.platform === 'linux') {
|
||||
module.exports.push(
|
||||
'SIGIO',
|
||||
'SIGPOLL',
|
||||
'SIGPWR',
|
||||
'SIGSTKFLT',
|
||||
'SIGUNUSED'
|
||||
)
|
||||
}
|
@ -1,410 +0,0 @@
|
||||
const Path = require('path')
|
||||
const fs = require('../libs/fsExtra')
|
||||
const njodb = require('../libs/njodb')
|
||||
|
||||
const { SupportedEbookTypes } = require('./globals')
|
||||
const { PlayMethod } = require('./constants')
|
||||
const { getId } = require('./index')
|
||||
const { filePathToPOSIX } = require('./fileUtils')
|
||||
const Logger = require('../Logger')
|
||||
|
||||
const Library = require('../objects/Library')
|
||||
const LibraryItem = require('../objects/LibraryItem')
|
||||
const Book = require('../objects/mediaTypes/Book')
|
||||
|
||||
const BookMetadata = require('../objects/metadata/BookMetadata')
|
||||
const FileMetadata = require('../objects/metadata/FileMetadata')
|
||||
|
||||
const AudioFile = require('../objects/files/AudioFile')
|
||||
const EBookFile = require('../objects/files/EBookFile')
|
||||
const LibraryFile = require('../objects/files/LibraryFile')
|
||||
const AudioMetaTags = require('../objects/metadata/AudioMetaTags')
|
||||
|
||||
const Author = require('../objects/entities/Author')
|
||||
const Series = require('../objects/entities/Series')
|
||||
|
||||
const MediaProgress = require('../objects/user/MediaProgress')
|
||||
const PlaybackSession = require('../objects/PlaybackSession')
|
||||
|
||||
const { isObject } = require('.')
|
||||
const User = require('../objects/user/User')
|
||||
|
||||
var authorsToAdd = []
|
||||
var existingDbAuthors = []
|
||||
var seriesToAdd = []
|
||||
var existingDbSeries = []
|
||||
|
||||
// Load old audiobooks
|
||||
async function loadAudiobooks() {
|
||||
var audiobookPath = Path.join(global.ConfigPath, 'audiobooks')
|
||||
|
||||
Logger.debug(`[dbMigration] loadAudiobooks path ${audiobookPath}`)
|
||||
var pathExists = await fs.pathExists(audiobookPath)
|
||||
if (!pathExists) {
|
||||
Logger.debug(`[dbMigration] loadAudiobooks path does not exist ${audiobookPath}`)
|
||||
return []
|
||||
}
|
||||
|
||||
var audiobooksDb = new njodb.Database(audiobookPath)
|
||||
return audiobooksDb.select(() => true).then((results) => {
|
||||
Logger.debug(`[dbMigration] loadAudiobooks select results ${results.data.length}`)
|
||||
return results.data
|
||||
})
|
||||
}
|
||||
|
||||
function makeAuthorsFromOldAb(authorsList) {
|
||||
return authorsList.filter(a => !!a).map(authorName => {
|
||||
var existingAuthor = authorsToAdd.find(a => a.name.toLowerCase() === authorName.toLowerCase())
|
||||
if (existingAuthor) {
|
||||
return existingAuthor.toJSONMinimal()
|
||||
}
|
||||
var existingDbAuthor = existingDbAuthors.find(a => a.name.toLowerCase() === authorName.toLowerCase())
|
||||
if (existingDbAuthor) {
|
||||
return existingDbAuthor.toJSONMinimal()
|
||||
}
|
||||
|
||||
var newAuthor = new Author()
|
||||
newAuthor.setData({ name: authorName })
|
||||
authorsToAdd.push(newAuthor)
|
||||
// Logger.debug(`>>> Created new author named "${authorName}"`)
|
||||
return newAuthor.toJSONMinimal()
|
||||
})
|
||||
}
|
||||
|
||||
function makeSeriesFromOldAb({ series, volumeNumber }) {
|
||||
var existingSeries = seriesToAdd.find(s => s.name.toLowerCase() === series.toLowerCase())
|
||||
if (existingSeries) {
|
||||
return [existingSeries.toJSONMinimal(volumeNumber)]
|
||||
}
|
||||
var existingDbSeriesItem = existingDbSeries.find(s => s.name.toLowerCase() === series.toLowerCase())
|
||||
if (existingDbSeriesItem) {
|
||||
return [existingDbSeriesItem.toJSONMinimal(volumeNumber)]
|
||||
}
|
||||
var newSeries = new Series()
|
||||
newSeries.setData({ name: series })
|
||||
seriesToAdd.push(newSeries)
|
||||
Logger.info(`>>> Created new series named "${series}"`)
|
||||
return [newSeries.toJSONMinimal(volumeNumber)]
|
||||
}
|
||||
|
||||
function getRelativePath(srcPath, basePath) {
|
||||
srcPath = filePathToPOSIX(srcPath)
|
||||
basePath = filePathToPOSIX(basePath)
|
||||
return srcPath.replace(basePath, '')
|
||||
}
|
||||
|
||||
function makeFilesFromOldAb(audiobook) {
|
||||
var libraryFiles = []
|
||||
var ebookFiles = []
|
||||
|
||||
var _audioFiles = audiobook.audioFiles || []
|
||||
var audioFiles = _audioFiles.map((af) => {
|
||||
var fileMetadata = new FileMetadata(af)
|
||||
fileMetadata.path = af.fullPath
|
||||
fileMetadata.relPath = getRelativePath(af.fullPath, audiobook.fullPath)
|
||||
|
||||
var newLibraryFile = new LibraryFile()
|
||||
newLibraryFile.ino = af.ino
|
||||
newLibraryFile.metadata = fileMetadata.clone()
|
||||
newLibraryFile.addedAt = af.addedAt
|
||||
newLibraryFile.updatedAt = Date.now()
|
||||
libraryFiles.push(newLibraryFile)
|
||||
|
||||
var audioMetaTags = new AudioMetaTags(af.metadata || {}) // Old metaTags was named metadata
|
||||
delete af.metadata
|
||||
|
||||
var newAudioFile = new AudioFile(af)
|
||||
newAudioFile.metadata = fileMetadata
|
||||
newAudioFile.metaTags = audioMetaTags
|
||||
newAudioFile.updatedAt = Date.now()
|
||||
return newAudioFile
|
||||
})
|
||||
|
||||
var _otherFiles = audiobook.otherFiles || []
|
||||
_otherFiles.forEach((file) => {
|
||||
var fileMetadata = new FileMetadata(file)
|
||||
fileMetadata.path = file.fullPath
|
||||
fileMetadata.relPath = getRelativePath(file.fullPath, audiobook.fullPath)
|
||||
|
||||
var newLibraryFile = new LibraryFile()
|
||||
newLibraryFile.ino = file.ino
|
||||
newLibraryFile.metadata = fileMetadata.clone()
|
||||
newLibraryFile.addedAt = file.addedAt
|
||||
newLibraryFile.updatedAt = Date.now()
|
||||
libraryFiles.push(newLibraryFile)
|
||||
|
||||
var formatExt = (file.ext || '').slice(1)
|
||||
if (SupportedEbookTypes.includes(formatExt)) {
|
||||
var newEBookFile = new EBookFile()
|
||||
newEBookFile.ino = file.ino
|
||||
newEBookFile.metadata = fileMetadata
|
||||
newEBookFile.ebookFormat = formatExt
|
||||
newEBookFile.addedAt = file.addedAt
|
||||
newEBookFile.updatedAt = Date.now()
|
||||
ebookFiles.push(newEBookFile)
|
||||
}
|
||||
})
|
||||
|
||||
return {
|
||||
libraryFiles,
|
||||
ebookFiles,
|
||||
audioFiles
|
||||
}
|
||||
}
|
||||
|
||||
// Metadata path was changed to /metadata/items make sure cover is using new path
|
||||
function cleanOldCoverPath(coverPath) {
|
||||
if (!coverPath) return null
|
||||
var oldMetadataPath = Path.posix.join(global.MetadataPath, 'books')
|
||||
if (coverPath.startsWith(oldMetadataPath)) {
|
||||
const newMetadataPath = Path.posix.join(global.MetadataPath, 'items')
|
||||
return coverPath.replace(oldMetadataPath, newMetadataPath)
|
||||
}
|
||||
return coverPath
|
||||
}
|
||||
|
||||
function makeLibraryItemFromOldAb(audiobook) {
|
||||
var libraryItem = new LibraryItem()
|
||||
libraryItem.id = audiobook.id
|
||||
libraryItem.ino = audiobook.ino
|
||||
libraryItem.libraryId = audiobook.libraryId
|
||||
libraryItem.folderId = audiobook.folderId
|
||||
libraryItem.path = audiobook.fullPath
|
||||
libraryItem.relPath = audiobook.path
|
||||
libraryItem.mtimeMs = audiobook.mtimeMs || 0
|
||||
libraryItem.ctimeMs = audiobook.ctimeMs || 0
|
||||
libraryItem.birthtimeMs = audiobook.birthtimeMs || 0
|
||||
libraryItem.addedAt = audiobook.addedAt
|
||||
libraryItem.updatedAt = audiobook.lastUpdate
|
||||
libraryItem.lastScan = audiobook.lastScan
|
||||
libraryItem.scanVersion = audiobook.scanVersion
|
||||
libraryItem.isMissing = audiobook.isMissing
|
||||
libraryItem.mediaType = 'book'
|
||||
|
||||
var bookEntity = new Book()
|
||||
var bookMetadata = new BookMetadata(audiobook.book)
|
||||
bookMetadata.publishedYear = audiobook.book.publishYear || null
|
||||
if (audiobook.book.narrator) {
|
||||
bookMetadata.narrators = (audiobook.book.narrator || '').split(', ')
|
||||
}
|
||||
// Returns array of json minimal authors
|
||||
bookMetadata.authors = makeAuthorsFromOldAb((audiobook.book.authorFL || '').split(', '))
|
||||
|
||||
// Returns array of json minimal series
|
||||
if (audiobook.book.series) {
|
||||
bookMetadata.series = makeSeriesFromOldAb(audiobook.book)
|
||||
}
|
||||
|
||||
bookEntity.libraryItemId = libraryItem.id
|
||||
bookEntity.metadata = bookMetadata
|
||||
bookEntity.coverPath = cleanOldCoverPath(audiobook.book.coverFullPath)
|
||||
bookEntity.tags = [...audiobook.tags]
|
||||
|
||||
var payload = makeFilesFromOldAb(audiobook)
|
||||
bookEntity.audioFiles = payload.audioFiles
|
||||
bookEntity.chapters = []
|
||||
if (audiobook.chapters && audiobook.chapters.length) {
|
||||
bookEntity.chapters = audiobook.chapters.map(c => ({ ...c }))
|
||||
}
|
||||
bookEntity.missingParts = audiobook.missingParts || []
|
||||
|
||||
if (payload.ebookFiles.length) {
|
||||
bookEntity.ebookFile = payload.ebookFiles[0]
|
||||
}
|
||||
|
||||
libraryItem.media = bookEntity
|
||||
libraryItem.libraryFiles = payload.libraryFiles
|
||||
return libraryItem
|
||||
}
|
||||
|
||||
async function migrateLibraryItems(db) {
|
||||
Logger.info(`==== Starting Library Item migration ====`)
|
||||
|
||||
var audiobooks = await loadAudiobooks()
|
||||
if (!audiobooks.length) {
|
||||
Logger.info(`>>> No audiobooks in db, no migration necessary`)
|
||||
return
|
||||
}
|
||||
|
||||
Logger.info(`>>> Loaded old audiobook data with ${audiobooks.length} records`)
|
||||
|
||||
if (db.libraryItems.length) {
|
||||
Logger.info(`>>> Some library items already loaded ${db.libraryItems.length} items | ${db.series.length} series | ${db.authors.length} authors`)
|
||||
return
|
||||
}
|
||||
|
||||
if (db.authors && db.authors.length) {
|
||||
existingDbAuthors = db.authors
|
||||
}
|
||||
if (db.series && db.series.length) {
|
||||
existingDbSeries = db.series
|
||||
}
|
||||
|
||||
var libraryItems = audiobooks.map((ab) => makeLibraryItemFromOldAb(ab))
|
||||
|
||||
Logger.info(`>>> ${libraryItems.length} Library Items made`)
|
||||
await db.bulkInsertEntities('libraryItem', libraryItems)
|
||||
if (authorsToAdd.length) {
|
||||
Logger.info(`>>> ${authorsToAdd.length} Authors made`)
|
||||
await db.bulkInsertEntities('author', authorsToAdd)
|
||||
}
|
||||
if (seriesToAdd.length) {
|
||||
Logger.info(`>>> ${seriesToAdd.length} Series made`)
|
||||
await db.insertEntities('series', seriesToAdd)
|
||||
}
|
||||
existingDbSeries = []
|
||||
existingDbAuthors = []
|
||||
authorsToAdd = []
|
||||
seriesToAdd = []
|
||||
Logger.info(`==== Library Item migration complete ====`)
|
||||
}
|
||||
|
||||
function cleanUserObject(db, userObj) {
|
||||
var cleanedUserPayload = {
|
||||
...userObj,
|
||||
mediaProgress: [],
|
||||
bookmarks: []
|
||||
}
|
||||
|
||||
// UserAudiobookData is now MediaProgress and AudioBookmarks separated
|
||||
if (userObj.audiobooks) {
|
||||
for (const audiobookId in userObj.audiobooks) {
|
||||
if (isObject(userObj.audiobooks[audiobookId])) {
|
||||
// Bookmarks now live on User.js object instead of inside UserAudiobookData
|
||||
if (userObj.audiobooks[audiobookId].bookmarks) {
|
||||
const cleanedBookmarks = userObj.audiobooks[audiobookId].bookmarks.map((bm) => {
|
||||
bm.libraryItemId = audiobookId
|
||||
return bm
|
||||
})
|
||||
cleanedUserPayload.bookmarks = cleanedUserPayload.bookmarks.concat(cleanedBookmarks)
|
||||
}
|
||||
|
||||
var userAudiobookData = userObj.audiobooks[audiobookId]
|
||||
var liProgress = new MediaProgress() // New Progress Object
|
||||
liProgress.id = userAudiobookData.audiobookId
|
||||
liProgress.libraryItemId = userAudiobookData.audiobookId
|
||||
liProgress.duration = userAudiobookData.totalDuration
|
||||
liProgress.isFinished = !!userAudiobookData.isRead
|
||||
Object.keys(liProgress.toJSON()).forEach((key) => {
|
||||
if (userAudiobookData[key] !== undefined) {
|
||||
liProgress[key] = userAudiobookData[key]
|
||||
}
|
||||
})
|
||||
cleanedUserPayload.mediaProgress.push(liProgress.toJSON())
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
const user = new User(cleanedUserPayload)
|
||||
return db.usersDb.update((record) => record.id === user.id, () => user).then((results) => {
|
||||
Logger.debug(`[dbMigration] Updated User: ${results.updated} | Selected: ${results.selected}`)
|
||||
return true
|
||||
}).catch((error) => {
|
||||
Logger.error(`[dbMigration] Update User Failed: ${error}`)
|
||||
return false
|
||||
})
|
||||
}
|
||||
|
||||
function cleanSessionObj(db, userListeningSession) {
|
||||
var newPlaybackSession = new PlaybackSession(userListeningSession)
|
||||
newPlaybackSession.id = getId('play')
|
||||
newPlaybackSession.mediaType = 'book'
|
||||
newPlaybackSession.updatedAt = userListeningSession.lastUpdate
|
||||
newPlaybackSession.libraryItemId = userListeningSession.audiobookId
|
||||
newPlaybackSession.playMethod = PlayMethod.TRANSCODE
|
||||
|
||||
// We only have title to transfer over nicely
|
||||
var bookMetadata = new BookMetadata()
|
||||
bookMetadata.title = userListeningSession.audiobookTitle || ''
|
||||
newPlaybackSession.mediaMetadata = bookMetadata
|
||||
|
||||
return db.sessionsDb.update((record) => record.id === userListeningSession.id, () => newPlaybackSession).then((results) => true).catch((error) => {
|
||||
Logger.error(`[dbMigration] Update Session Failed: ${error}`)
|
||||
return false
|
||||
})
|
||||
}
|
||||
|
||||
async function migrateUserData(db) {
|
||||
Logger.info(`==== Starting User migration ====`)
|
||||
|
||||
// Libraries with previous mediaType of "podcast" moved to "book"
|
||||
// because migrating those items to podcast objects will be a nightmare
|
||||
// users will need to create a new library for podcasts
|
||||
var availableIcons = ['database', 'audiobook', 'book', 'comic', 'podcast']
|
||||
const libraries = await db.librariesDb.select((result) => (result.mediaType != 'book' || !availableIcons.includes(result.icon)))
|
||||
.then((results) => results.data.map(lib => new Library(lib)))
|
||||
if (!libraries.length) {
|
||||
Logger.info('[dbMigration] No libraries found needing migration')
|
||||
} else {
|
||||
for (const library of libraries) {
|
||||
Logger.info(`>> Migrating library "${library.name}" with media type "${library.mediaType}"`)
|
||||
await db.librariesDb.update((record) => record.id === library.id, () => library).then(() => true).catch((error) => {
|
||||
Logger.error(`[dbMigration] Update library failed: ${error}`)
|
||||
return false
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
const userObjects = await db.usersDb.select((result) => result.audiobooks != undefined).then((results) => results.data)
|
||||
if (!userObjects.length) {
|
||||
Logger.warn('[dbMigration] No users found needing migration')
|
||||
return
|
||||
}
|
||||
|
||||
var userCount = 0
|
||||
for (const userObj of userObjects) {
|
||||
Logger.info(`[dbMigration] Migrating User "${userObj.username}"`)
|
||||
var success = await cleanUserObject(db, userObj)
|
||||
if (!success) {
|
||||
await new Promise((resolve) => setTimeout(resolve, 500))
|
||||
Logger.warn(`[dbMigration] Second attempt Migrating User "${userObj.username}"`)
|
||||
success = await cleanUserObject(db, userObj)
|
||||
if (!success) {
|
||||
throw new Error('Db migration failed migrating users')
|
||||
}
|
||||
}
|
||||
userCount++
|
||||
}
|
||||
|
||||
var sessionCount = 0
|
||||
const userListeningSessions = await db.sessionsDb.select((result) => result.audiobookId != undefined).then((results) => results.data)
|
||||
if (userListeningSessions.length) {
|
||||
|
||||
for (const session of userListeningSessions) {
|
||||
var success = await cleanSessionObj(db, session)
|
||||
if (!success) {
|
||||
await new Promise((resolve) => setTimeout(resolve, 500))
|
||||
Logger.warn(`[dbMigration] Second attempt Migrating Session "${session.id}"`)
|
||||
success = await cleanSessionObj(db, session)
|
||||
if (!success) {
|
||||
Logger.error(`[dbMigration] Failed to migrate session "${session.id}"`)
|
||||
}
|
||||
}
|
||||
if (success) sessionCount++
|
||||
}
|
||||
}
|
||||
|
||||
Logger.info(`==== User migration complete (${userCount} Users, ${sessionCount} Sessions) ====`)
|
||||
}
|
||||
|
||||
async function checkUpdateMetadataPath() {
|
||||
var bookMetadataPath = Path.posix.join(global.MetadataPath, 'books') // OLD
|
||||
if (!(await fs.pathExists(bookMetadataPath))) {
|
||||
Logger.debug(`[dbMigration] No need to update books metadata path`)
|
||||
return
|
||||
}
|
||||
var itemsMetadataPath = Path.posix.join(global.MetadataPath, 'items')
|
||||
await fs.rename(bookMetadataPath, itemsMetadataPath)
|
||||
Logger.info(`>>> Renamed metadata dir from /metadata/books to /metadata/items`)
|
||||
}
|
||||
|
||||
module.exports.migrate = async (db) => {
|
||||
await checkUpdateMetadataPath()
|
||||
// Before DB Load clean data
|
||||
await migrateUserData(db)
|
||||
await db.init()
|
||||
// After DB Load
|
||||
await migrateLibraryItems(db)
|
||||
// TODO: Eventually remove audiobooks db when stable
|
||||
}
|
Loading…
Reference in New Issue
Block a user