mirror of
https://github.com/Unleash/unleash.git
synced 2025-01-20 00:08:02 +01:00
feat: postgres lock (#3443)
This commit is contained in:
parent
06b969a139
commit
8654c9ea42
@ -161,6 +161,7 @@
|
||||
"@types/node": "16.18.21",
|
||||
"@types/nodemailer": "6.4.7",
|
||||
"@types/owasp-password-strength-test": "1.3.0",
|
||||
"@types/pg": "8.6.6",
|
||||
"@types/semver": "7.3.13",
|
||||
"@types/stoppable": "1.1.1",
|
||||
"@types/supertest": "2.0.12",
|
||||
|
@ -79,6 +79,7 @@ exports[`should create default config 1`] = `
|
||||
"loginHistory": false,
|
||||
"maintenanceMode": false,
|
||||
"messageBanner": false,
|
||||
"migrationLock": false,
|
||||
"newProjectOverview": false,
|
||||
"optimal304": false,
|
||||
"optimal304Differ": false,
|
||||
@ -105,6 +106,7 @@ exports[`should create default config 1`] = `
|
||||
"loginHistory": false,
|
||||
"maintenanceMode": false,
|
||||
"messageBanner": false,
|
||||
"migrationLock": false,
|
||||
"newProjectOverview": false,
|
||||
"optimal304": false,
|
||||
"optimal304Differ": false,
|
||||
|
@ -1,7 +1,7 @@
|
||||
import { EventEmitter } from 'events';
|
||||
import express from 'express';
|
||||
import { createTestConfig } from '../test/config/test-config';
|
||||
import { start, create } from './server-impl';
|
||||
import FakeEventStore from '../test/fixtures/fake-event-store';
|
||||
|
||||
jest.mock(
|
||||
'./routes',
|
||||
@ -15,7 +15,7 @@ jest.mock(
|
||||
|
||||
const noop = () => {};
|
||||
|
||||
const eventStore = new EventEmitter();
|
||||
const eventStore = new FakeEventStore();
|
||||
const settingStore = {
|
||||
get: () => {
|
||||
Promise.resolve('secret');
|
||||
@ -54,6 +54,10 @@ jest.mock('../migrator', () => ({
|
||||
migrateDb: () => Promise.resolve(),
|
||||
}));
|
||||
|
||||
jest.mock('./util/db-lock', () => ({
|
||||
withDbLock: () => (fn) => fn,
|
||||
}));
|
||||
|
||||
jest.mock(
|
||||
'./util/version',
|
||||
() =>
|
||||
@ -123,5 +127,5 @@ test('should shutdown the server when calling stop()', async () => {
|
||||
createTestConfig({ server: { port: 0 } }),
|
||||
);
|
||||
await stop();
|
||||
expect(server.address()).toBe(null);
|
||||
expect(server!.address()).toBe(null);
|
||||
});
|
||||
|
@ -32,6 +32,7 @@ import { Knex } from 'knex';
|
||||
import * as permissions from './types/permissions';
|
||||
import * as eventType from './types/events';
|
||||
import { Db } from './db/db';
|
||||
import { defaultLockKey, defaultTimeout, withDbLock } from './util/db-lock';
|
||||
|
||||
async function createApp(
|
||||
config: IUnleashConfig,
|
||||
@ -138,7 +139,18 @@ async function start(opts: IUnleashOptions = {}): Promise<IUnleash> {
|
||||
logger.info('DB migration: disabled');
|
||||
} else {
|
||||
logger.debug('DB migration: start');
|
||||
await migrateDb(config);
|
||||
if (opts.flagResolver?.isEnabled('migrationLock')) {
|
||||
logger.info('Running migration with lock');
|
||||
const lock = withDbLock(config.db, {
|
||||
lockKey: defaultLockKey,
|
||||
timeout: defaultTimeout,
|
||||
logger,
|
||||
});
|
||||
await lock(migrateDb)(config);
|
||||
} else {
|
||||
await migrateDb(config);
|
||||
}
|
||||
|
||||
logger.debug('DB migration: end');
|
||||
}
|
||||
} catch (err) {
|
||||
|
@ -77,6 +77,7 @@ const flags = {
|
||||
process.env.UNLEASH_EXPERIMENTAL_OPTIMAL_304_DIFFER,
|
||||
false,
|
||||
),
|
||||
migrationLock: parseEnvVarBoolean(process.env.MIGRATION_LOCK, false),
|
||||
};
|
||||
|
||||
export const defaultExperimentalOptions: IExperimentalOptions = {
|
||||
|
69
src/lib/util/db-lock.test.ts
Normal file
69
src/lib/util/db-lock.test.ts
Normal file
@ -0,0 +1,69 @@
|
||||
import { withDbLock } from './db-lock';
|
||||
import { getDbConfig } from '../../test/e2e/helpers/database-config';
|
||||
import { IDBOption } from '../types';
|
||||
import { Logger } from '../logger';
|
||||
|
||||
test('should lock access to any action', async () => {
|
||||
const lock = withDbLock(getDbConfig() as IDBOption);
|
||||
|
||||
const asyncAction = (input: string) => Promise.resolve(`result: ${input}`);
|
||||
|
||||
const result = await lock(asyncAction)('data');
|
||||
|
||||
expect(result).toBe('result: data');
|
||||
});
|
||||
|
||||
const ms = (millis: number) =>
|
||||
new Promise((resolve) => {
|
||||
setTimeout(() => resolve('time'), millis);
|
||||
});
|
||||
|
||||
test('should await other actions on lock', async () => {
|
||||
const lock = withDbLock(getDbConfig() as IDBOption);
|
||||
|
||||
const results: string[] = [];
|
||||
const slowAsyncAction = (input: string) => {
|
||||
return new Promise((resolve) => {
|
||||
setTimeout(() => {
|
||||
results.push(input);
|
||||
resolve(input);
|
||||
}, 200);
|
||||
});
|
||||
};
|
||||
const fastAction = async (input: string) => {
|
||||
results.push(input);
|
||||
};
|
||||
|
||||
const lockedAction = lock(slowAsyncAction);
|
||||
const lockedAnotherAction = lock(fastAction);
|
||||
|
||||
// deliberately skipped await to simulate another server running slow operation
|
||||
lockedAction('first');
|
||||
await ms(100); // start fast action after slow action established DB connection
|
||||
await lockedAnotherAction('second');
|
||||
|
||||
await expect(results).toStrictEqual(['first', 'second']);
|
||||
});
|
||||
|
||||
test('should handle lock timeout', async () => {
|
||||
const timeoutMs = 1;
|
||||
let loggedError = '';
|
||||
const lock = withDbLock(getDbConfig() as IDBOption, {
|
||||
lockKey: 1,
|
||||
timeout: timeoutMs,
|
||||
logger: {
|
||||
error(msg: string) {
|
||||
loggedError = msg;
|
||||
},
|
||||
} as unknown as Logger,
|
||||
});
|
||||
|
||||
// the query should fail because of the timeout. This one is a fallback when timeout
|
||||
// was not triggered in the integration test
|
||||
const asyncAction = () => Promise.reject(new Error('Query read timeout'));
|
||||
|
||||
await expect(lock(asyncAction)()).rejects.toStrictEqual(
|
||||
new Error('Query read timeout'),
|
||||
);
|
||||
expect(loggedError).toBe('Locking error: Query read timeout');
|
||||
});
|
43
src/lib/util/db-lock.ts
Normal file
43
src/lib/util/db-lock.ts
Normal file
@ -0,0 +1,43 @@
|
||||
import { Client } from 'pg';
|
||||
import { IDBOption } from '../types';
|
||||
import { Logger } from '../logger';
|
||||
|
||||
export const defaultLockKey = 479341;
|
||||
export const defaultTimeout = 5000;
|
||||
|
||||
interface IDbLockOptions {
|
||||
timeout: number;
|
||||
lockKey: number;
|
||||
logger: Logger;
|
||||
}
|
||||
|
||||
const defaultOptions: IDbLockOptions = {
|
||||
timeout: defaultTimeout,
|
||||
lockKey: defaultLockKey,
|
||||
logger: { ...console, fatal: console.error },
|
||||
};
|
||||
|
||||
export const withDbLock =
|
||||
(dbConfig: IDBOption, config = defaultOptions) =>
|
||||
<A extends any[], R>(fn: (...args: A) => Promise<R>) =>
|
||||
async (...args: A): Promise<R> => {
|
||||
const client = new Client({
|
||||
...dbConfig,
|
||||
query_timeout: config.timeout,
|
||||
});
|
||||
try {
|
||||
await client.connect();
|
||||
// wait to obtain a lock
|
||||
await client.query('SELECT pg_advisory_lock($1)', [config.lockKey]);
|
||||
const result = await fn(...args);
|
||||
return result;
|
||||
} catch (e) {
|
||||
config.logger.error(`Locking error: ${e.message}`);
|
||||
throw e;
|
||||
} finally {
|
||||
await client.query('SELECT pg_advisory_unlock($1)', [
|
||||
config.lockKey,
|
||||
]);
|
||||
await client.end();
|
||||
}
|
||||
};
|
4
src/test/fixtures/fake-event-store.ts
vendored
4
src/test/fixtures/fake-event-store.ts
vendored
@ -16,7 +16,7 @@ class FakeEventStore implements IEventStore {
|
||||
}
|
||||
|
||||
getMaxRevisionId(): Promise<number> {
|
||||
throw new Error('Method not implemented.');
|
||||
return Promise.resolve(1);
|
||||
}
|
||||
|
||||
store(event: IEvent): Promise<void> {
|
||||
@ -64,7 +64,7 @@ class FakeEventStore implements IEventStore {
|
||||
}
|
||||
|
||||
async get(key: number): Promise<IEvent> {
|
||||
return this.events.find((e) => e.id === key);
|
||||
return this.events.find((e) => e.id === key)!;
|
||||
}
|
||||
|
||||
async getAll(): Promise<IEvent[]> {
|
||||
|
16
yarn.lock
16
yarn.lock
@ -1271,6 +1271,15 @@
|
||||
resolved "https://registry.yarnpkg.com/@types/owasp-password-strength-test/-/owasp-password-strength-test-1.3.0.tgz#f639e38847eeb0db14bf7b70896cecd4342ac571"
|
||||
integrity sha512-eKYl6svyRua5OVUFm+AXSxdBrKo7snzrCcFv0KoqKNvNgB3fJzRq3s/xphf+jNTllqYxgsx1uWLeAcL4MjLWQQ==
|
||||
|
||||
"@types/pg@8.6.6":
|
||||
version "8.6.6"
|
||||
resolved "https://registry.yarnpkg.com/@types/pg/-/pg-8.6.6.tgz#21cdf873a3e345a6e78f394677e3b3b1b543cb80"
|
||||
integrity sha512-O2xNmXebtwVekJDD+02udOncjVcMZQuTEQEMpKJ0ZRf5E7/9JJX3izhKUcUifBkyKpljyUM6BTgy2trmviKlpw==
|
||||
dependencies:
|
||||
"@types/node" "*"
|
||||
pg-protocol "*"
|
||||
pg-types "^2.2.0"
|
||||
|
||||
"@types/prettier@^2.1.5":
|
||||
version "2.7.2"
|
||||
resolved "https://registry.yarnpkg.com/@types/prettier/-/prettier-2.7.2.tgz#6c2324641cc4ba050a8c710b2b251b377581fbf0"
|
||||
@ -5646,12 +5655,17 @@ pg-pool@^3.5.2:
|
||||
resolved "https://registry.yarnpkg.com/pg-pool/-/pg-pool-3.5.2.tgz#ed1bed1fb8d79f1c6fd5fb1c99e990fbf9ddf178"
|
||||
integrity sha512-His3Fh17Z4eg7oANLob6ZvH8xIVen3phEZh2QuyrIl4dQSDVEabNducv6ysROKpDNPSD+12tONZVWfSgMvDD9w==
|
||||
|
||||
pg-protocol@*:
|
||||
version "1.6.0"
|
||||
resolved "https://registry.yarnpkg.com/pg-protocol/-/pg-protocol-1.6.0.tgz#4c91613c0315349363af2084608db843502f8833"
|
||||
integrity sha512-M+PDm637OY5WM307051+bsDia5Xej6d9IR4GwJse1qA1DIhiKlksvrneZOYQq42OM+spubpcNYEo2FcKQrDk+Q==
|
||||
|
||||
pg-protocol@^1.5.0:
|
||||
version "1.5.0"
|
||||
resolved "https://registry.yarnpkg.com/pg-protocol/-/pg-protocol-1.5.0.tgz#b5dd452257314565e2d54ab3c132adc46565a6a0"
|
||||
integrity sha512-muRttij7H8TqRNu/DxrAJQITO4Ac7RmX3Klyr/9mJEOBeIpgnF8f9jAfRz5d3XwQZl5qBjF9gLsUtMPJE0vezQ==
|
||||
|
||||
pg-types@^2.1.0:
|
||||
pg-types@^2.1.0, pg-types@^2.2.0:
|
||||
version "2.2.0"
|
||||
resolved "https://registry.yarnpkg.com/pg-types/-/pg-types-2.2.0.tgz#2d0250d636454f7cfa3b6ae0382fdfa8063254a3"
|
||||
integrity sha512-qTAAlrEsl8s4OiEQY69wDvcMIdQN6wdz5ojQiOy6YRMuynxenON0O5oCpJI6lshc6scgAY8qvJ2On/p+CXY0GA==
|
||||
|
Loading…
Reference in New Issue
Block a user