audiobookshelf/server/libs/njodb/njodb.js

724 lines
19 KiB
JavaScript
Raw Permalink Normal View History

"use strict";
const {
appendFile,
appendFileSync,
createReadStream,
createWriteStream,
readFileSync,
readdir,
readdirSync,
stat,
statSync,
writeFile
} = require("graceful-fs");
const {
join,
resolve
} = require("path");
const { createInterface } = require("readline");
const { promisify } = require("util");
const {
check,
checkSync,
lock,
lockSync
} = require("../properLockfile");
const {
deleteFile,
deleteFileSync,
deleteDirectory,
deleteDirectorySync,
fileExists,
fileExistsSync,
moveFile,
moveFileSync,
releaseLock,
releaseLockSync,
replaceFile,
replaceFileSync
} = require("./utils");
const {
Handler,
Randomizer,
Result
} = require("./objects");
const filterStoreNames = (files, dataname) => {
var storenames = [];
const re = new RegExp("^" + [dataname, "\\d+", "json"].join(".") + "$");
for (const file of files) {
if (re.test(file)) storenames.push(file);
}
return storenames;
};
const getStoreNames = async (datapath, dataname) => {
const files = await promisify(readdir)(datapath);
return filterStoreNames(files, dataname);
}
const getStoreNamesSync = (datapath, dataname) => {
const files = readdirSync(datapath);
return filterStoreNames(files, dataname);
};
// Database management
const statsStoreData = async (store, lockoptions) => {
var release, stats, results;
release = await lock(store, lockoptions);
const handlerResults = await new Promise((resolve, reject) => {
const reader = createInterface({ input: createReadStream(store), crlfDelay: Infinity });
const handler = Handler("stats");
reader.on("line", record => handler.next(record));
reader.on("close", () => resolve(handler.return()));
reader.on("error", error => reject(error));
});
if (await check(store, lockoptions)) await releaseLock(store, release);
results = Object.assign({ store: resolve(store) }, handlerResults)
stats = await promisify(stat)(store);
results.size = stats.size;
results.created = stats.birthtime;
results.modified = stats.mtime;
results.end = Date.now()
return results;
};
const statsStoreDataSync = (store) => {
var file, release, results;
release = lockSync(store);
file = readFileSync(store, "utf8");
if (checkSync(store)) releaseLockSync(store, release);
const data = file.split("\n");
const handler = Handler("stats");
for (var record of data) {
handler.next(record)
}
results = Object.assign({ store: resolve(store) }, handler.return());
const stats = statSync(store);
results.size = stats.size;
results.created = stats.birthtime;
results.modified = stats.mtime;
results.end = Date.now();
return results;
};
const distributeStoreData = async (properties) => {
var results = Result("distribute");
var storepaths = [];
var tempstorepaths = [];
var locks = [];
for (let storename of properties.storenames) {
const storepath = join(properties.datapath, storename);
storepaths.push(storepath);
locks.push(lock(storepath, properties.lockoptions));
}
const releases = await Promise.all(locks);
var writes = [];
var writers = [];
for (let i = 0; i < properties.datastores; i++) {
const tempstorepath = join(properties.temppath, [properties.dataname, i, results.start, "json"].join("."));
tempstorepaths.push(tempstorepath);
await promisify(writeFile)(tempstorepath, "");
writers.push(createWriteStream(tempstorepath, { flags: "r+" }));
}
for (let storename of properties.storenames) {
writes.push(new Promise((resolve, reject) => {
var line = 0;
const store = join(properties.datapath, storename);
const randomizer = Randomizer(Array.from(Array(properties.datastores).keys()), false);
const reader = createInterface({ input: createReadStream(store), crlfDelay: Infinity });
reader.on("line", record => {
const storenumber = randomizer.next();
line++;
try {
record = JSON.stringify(JSON.parse(record));
results.records++;
} catch {
results.errors.push({ line: line, data: record });
} finally {
writers[storenumber].write(record + "\n");
}
});
reader.on("close", () => {
resolve(true);
});
reader.on("error", error => {
reject(error);
});
}));
}
await Promise.all(writes);
for (let writer of writers) {
writer.end();
}
var deletes = [];
for (let storepath of storepaths) {
deletes.push(deleteFile(storepath));
}
await Promise.all(deletes);
for (const release of releases) {
release();
}
var moves = [];
for (let i = 0; i < tempstorepaths.length; i++) {
moves.push(moveFile(tempstorepaths[i], join(properties.datapath, [properties.dataname, i, "json"].join("."))))
}
await Promise.all(moves);
results.stores = tempstorepaths.length,
results.end = Date.now();
results.elapsed = results.end - results.start;
return results;
};
const distributeStoreDataSync = (properties) => {
var results = Result("distribute");
var storepaths = [];
var tempstorepaths = [];
var releases = [];
var data = [];
for (let storename of properties.storenames) {
const storepath = join(properties.datapath, storename);
storepaths.push(storepath);
releases.push(lockSync(storepath));
const file = readFileSync(storepath, "utf8").trimEnd();
if (file.length > 0) data = data.concat(file.split("\n"));
}
var records = [];
for (var i = 0; i < data.length; i++) {
try {
data[i] = JSON.stringify(JSON.parse(data[i]));
results.records++;
} catch (error) {
results.errors.push({ line: i, data: data[i] });
} finally {
if (i === i % properties.datastores) records[i] = [];
records[i % properties.datastores] += data[i] + "\n";
}
}
const randomizer = Randomizer(Array.from(Array(properties.datastores).keys()), false);
for (var j = 0; j < records.length; j++) {
const storenumber = randomizer.next();
const tempstorepath = join(properties.temppath, [properties.dataname, storenumber, results.start, "json"].join("."));
tempstorepaths.push(tempstorepath);
appendFileSync(tempstorepath, records[j]);
}
for (let storepath of storepaths) {
deleteFileSync(storepath);
}
for (const release of releases) {
release();
}
for (let i = 0; i < tempstorepaths.length; i++) {
moveFileSync(tempstorepaths[i], join(properties.datapath, [properties.dataname, i, "json"].join(".")));
}
results.stores = tempstorepaths.length,
results.end = Date.now();
results.elapsed = results.end - results.start;
return results;
};
const dropEverything = async (properties) => {
var locks = [];
for (let storename of properties.storenames) {
locks.push(lock(join(properties.datapath, storename), properties.lockoptions));
}
const releases = await Promise.all(locks);
var deletes = [];
for (let storename of properties.storenames) {
deletes.push(deleteFile(join(properties.datapath, storename)));
}
var results = await Promise.all(deletes);
for (const release of releases) {
release();
}
deletes = [
deleteDirectory(properties.temppath),
deleteDirectory(properties.datapath),
deleteFile(join(properties.root, "njodb.properties"))
];
results = results.concat(await Promise.all(deletes));
return results;
}
const dropEverythingSync = (properties) => {
var results = [];
var releases = [];
for (let storename of properties.storenames) {
releases.push(lockSync(join(properties.datapath, storename)));
}
for (let storename of properties.storenames) {
results.push(deleteFileSync(join(properties.datapath, storename)));
}
for (const release of releases) {
release();
}
results.push(deleteDirectorySync(properties.temppath));
results.push(deleteDirectorySync(properties.datapath));
results.push(deleteFileSync(join(properties.root, "njodb.properties")));
return results;
}
// Data manipulation
const insertStoreData = async (store, data, lockoptions) => {
let release, results;
results = Object.assign({ store: resolve(store) }, Result("insert"));
if (await fileExists(store)) release = await lock(store, lockoptions);
await promisify(appendFile)(store, data, "utf8");
if (await check(store, lockoptions)) await releaseLock(store, release);
results.inserted = (data.length > 0) ? data.split("\n").length - 1 : 0;
results.end = Date.now();
return results;
};
const insertStoreDataSync = (store, data) => {
let release, results;
results = Object.assign({ store: resolve(store) }, Result("insert"));
if (fileExistsSync(store)) release = lockSync(store);
appendFileSync(store, data, "utf8");
if (checkSync(store)) releaseLockSync(store, release);
results.inserted = (data.length > 0) ? data.split("\n").length - 1 : 0;
results.end = Date.now();
return results;
};
const insertFileData = async (file, datapath, storenames, lockoptions) => {
let datastores, locks, releases, writers, results;
results = Result("insertFile");
datastores = storenames.length;
locks = [];
writers = [];
for (let storename of storenames) {
const storepath = join(datapath, storename);
locks.push(lock(storepath, lockoptions));
writers.push(createWriteStream(storepath, { flags: "r+" }));
}
releases = await Promise.all(locks);
await new Promise((resolve, reject) => {
const randomizer = Randomizer(Array.from(Array(datastores).keys()), false);
const reader = createInterface({ input: createReadStream(file), crlfDelay: Infinity });
reader.on("line", record => {
record = record.trim();
const storenumber = randomizer.next();
results.lines++;
if (record.length > 0) {
try {
record = JSON.parse(record);
results.inserted++;
} catch (error) {
results.errors.push({ error: error.message, line: results.lines, data: record });
} finally {
writers[storenumber].write(JSON.stringify(record) + "\n");
}
} else {
results.blanks++;
}
});
reader.on("close", () => {
resolve(true);
});
reader.on("error", error => {
reject(error);
});
});
for (const writer of writers) {
writer.end();
}
for (const release of releases) {
release();
}
results.end = Date.now();
results.elapsed = results.end - results.start;
return results;
}
const selectStoreData = async (store, match, project, lockoptions) => {
let release, results;
release = await lock(store, lockoptions);
const handlerResults = await new Promise((resolve, reject) => {
const reader = createInterface({ input: createReadStream(store), crlfDelay: Infinity });
const handler = Handler("select", match, project);
reader.on("line", record => handler.next(record));
reader.on("close", () => resolve(handler.return()));
reader.on("error", error => reject(error));
});
if (await check(store, lockoptions)) await releaseLock(store, release);
results = Object.assign({ store: store }, handlerResults);
return results;
};
const selectStoreDataSync = (store, match, project) => {
let file, release, results;
release = lockSync(store);
file = readFileSync(store, "utf8");
if (checkSync(store)) releaseLockSync(store, release);
const records = file.split("\n");
const handler = Handler("select", match, project);
for (var record of records) {
handler.next(record);
}
results = Object.assign({ store: store }, handler.return());
return results;
};
const updateStoreData = async (store, match, update, tempstore, lockoptions) => {
let release, results;
release = await lock(store, lockoptions);
const handlerResults = await new Promise((resolve, reject) => {
const writer = createWriteStream(tempstore);
const handler = Handler("update", match, update);
writer.on("open", () => {
// Reader was opening and closing before writer ever opened
const reader = createInterface({ input: createReadStream(store), crlfDelay: Infinity });
reader.on("line", record => {
handler.next(record, writer)
});
reader.on("close", () => {
writer.end();
resolve(handler.return());
});
reader.on("error", error => reject(error));
});
writer.on("error", error => reject(error));
});
results = Object.assign({ store: store, tempstore: tempstore }, handlerResults);
if (results.updated > 0) {
if (!await replaceFile(store, tempstore)) {
results.errors = [...results.records];
results.updated = 0;
}
} else {
await deleteFile(tempstore);
}
if (await check(store, lockoptions)) await releaseLock(store, release);
results.end = Date.now();
delete results.data;
delete results.records;
return results;
};
const updateStoreDataSync = (store, match, update, tempstore) => {
let file, release, results;
release = lockSync(store);
file = readFileSync(store, "utf8").trimEnd();
if (checkSync(store)) releaseLockSync(store, release);
const records = file.split("\n");
const handler = Handler("update", match, update);
for (var record of records) {
handler.next(record);
}
results = Object.assign({ store: store, tempstore: tempstore }, handler.return());
if (results.updated > 0) {
let append, replace;
try {
appendFileSync(tempstore, results.data.join("\n") + "\n", "utf8");
append = true;
} catch {
append = false;
}
if (append) replace = replaceFileSync(store, tempstore);
if (!(append || replace)) {
results.errors = [...results.records];
results.updated = 0;
}
}
results.end = Date.now();
delete results.data;
delete results.records;
return results;
};
const deleteStoreData = async (store, match, tempstore, lockoptions) => {
let release, results;
release = await lock(store, lockoptions);
const handlerResults = await new Promise((resolve, reject) => {
const writer = createWriteStream(tempstore);
const handler = Handler("delete", match);
writer.on("open", () => {
// Create reader after writer opens otherwise the reader can sometimes close before the writer opens
const reader = createInterface({ input: createReadStream(store), crlfDelay: Infinity });
reader.on("line", record => handler.next(record, writer));
reader.on("close", () => {
writer.end();
resolve(handler.return());
});
reader.on("error", error => reject(error));
});
writer.on("error", error => reject(error));
});
results = Object.assign({ store: store, tempstore: tempstore }, handlerResults);
if (results.deleted > 0) {
if (!await replaceFile(store, tempstore)) {
results.errors = [...results.records];
results.deleted = 0;
}
} else {
await deleteFile(tempstore);
}
if (await check(store, lockoptions)) await releaseLock(store, release);
results.end = Date.now();
delete results.data;
delete results.records;
return results;
};
const deleteStoreDataSync = (store, match, tempstore) => {
let file, release, results;
release = lockSync(store);
file = readFileSync(store, "utf8");
if (checkSync(store)) releaseLockSync(store, release);
const records = file.split("\n");
const handler = Handler("delete", match);
for (var record of records) {
handler.next(record)
}
results = Object.assign({ store: store, tempstore: tempstore }, handler.return());
if (results.deleted > 0) {
let append, replace;
try {
appendFileSync(tempstore, results.data.join("\n") + "\n", "utf8");
append = true;
} catch {
append = false;
}
if (append) replace = replaceFileSync(store, tempstore);
if (!(append || replace)) {
results.errors = [...results.records];
results.updated = 0;
}
}
results.end = Date.now();
delete results.data;
delete results.records;
return results;
};
const aggregateStoreData = async (store, match, index, project, lockoptions) => {
let release, results;
release = await lock(store, lockoptions);
const handlerResults = await new Promise((resolve, reject) => {
const reader = createInterface({ input: createReadStream(store), crlfDelay: Infinity });
const handler = Handler("aggregate", match, index, project);
reader.on("line", record => handler.next(record));
reader.on("close", () => resolve(handler.return()));
reader.on("error", error => reject(error));
});
if (await check(store, lockoptions)) releaseLock(store, release);
results = Object.assign({ store: store }, handlerResults);
return results;
}
const aggregateStoreDataSync = (store, match, index, project) => {
let file, release, results;
release = lockSync(store);
file = readFileSync(store, "utf8");
if (checkSync(store)) releaseLockSync(store, release);
const records = file.split("\n");
const handler = Handler("aggregate", match, index, project);
for (var record of records) {
handler.next(record);
}
results = Object.assign({ store: store }, handler.return());
return results;
}
exports.getStoreNames = getStoreNames;
exports.getStoreNamesSync = getStoreNamesSync;
// Database management
exports.statsStoreData = statsStoreData;
exports.statsStoreDataSync = statsStoreDataSync;
exports.distributeStoreData = distributeStoreData;
exports.distributeStoreDataSync = distributeStoreDataSync;
exports.dropEverything = dropEverything;
exports.dropEverythingSync = dropEverythingSync;
// Data manipulation
exports.insertStoreData = insertStoreData;
exports.insertStoreDataSync = insertStoreDataSync;
exports.insertFileData = insertFileData;
exports.selectStoreData = selectStoreData;
exports.selectStoreDataSync = selectStoreDataSync;
exports.updateStoreData = updateStoreData;
exports.updateStoreDataSync = updateStoreDataSync;
exports.deleteStoreData = deleteStoreData;
exports.deleteStoreDataSync = deleteStoreDataSync;
exports.aggregateStoreData = aggregateStoreData;
exports.aggregateStoreDataSync = aggregateStoreDataSync;