diff --git a/package.json b/package.json index ad3b041644..40fe3e91dd 100644 --- a/package.json +++ b/package.json @@ -161,6 +161,7 @@ "@types/node": "16.18.21", "@types/nodemailer": "6.4.7", "@types/owasp-password-strength-test": "1.3.0", + "@types/pg": "8.6.6", "@types/semver": "7.3.13", "@types/stoppable": "1.1.1", "@types/supertest": "2.0.12", diff --git a/src/lib/__snapshots__/create-config.test.ts.snap b/src/lib/__snapshots__/create-config.test.ts.snap index 9cf175e93f..1db73081b2 100644 --- a/src/lib/__snapshots__/create-config.test.ts.snap +++ b/src/lib/__snapshots__/create-config.test.ts.snap @@ -79,6 +79,7 @@ exports[`should create default config 1`] = ` "loginHistory": false, "maintenanceMode": false, "messageBanner": false, + "migrationLock": false, "newProjectOverview": false, "optimal304": false, "optimal304Differ": false, @@ -105,6 +106,7 @@ exports[`should create default config 1`] = ` "loginHistory": false, "maintenanceMode": false, "messageBanner": false, + "migrationLock": false, "newProjectOverview": false, "optimal304": false, "optimal304Differ": false, diff --git a/src/lib/server-impl.test.ts b/src/lib/server-impl.test.ts index ad104962e6..f951fa970f 100644 --- a/src/lib/server-impl.test.ts +++ b/src/lib/server-impl.test.ts @@ -1,7 +1,7 @@ -import { EventEmitter } from 'events'; import express from 'express'; import { createTestConfig } from '../test/config/test-config'; import { start, create } from './server-impl'; +import FakeEventStore from '../test/fixtures/fake-event-store'; jest.mock( './routes', @@ -15,7 +15,7 @@ jest.mock( const noop = () => {}; -const eventStore = new EventEmitter(); +const eventStore = new FakeEventStore(); const settingStore = { get: () => { Promise.resolve('secret'); @@ -54,6 +54,10 @@ jest.mock('../migrator', () => ({ migrateDb: () => Promise.resolve(), })); +jest.mock('./util/db-lock', () => ({ + withDbLock: () => (fn) => fn, +})); + jest.mock( './util/version', () => @@ -123,5 +127,5 @@ test('should shutdown the server when calling stop()', async () => { createTestConfig({ server: { port: 0 } }), ); await stop(); - expect(server.address()).toBe(null); + expect(server!.address()).toBe(null); }); diff --git a/src/lib/server-impl.ts b/src/lib/server-impl.ts index eb02bb0b25..f762365929 100644 --- a/src/lib/server-impl.ts +++ b/src/lib/server-impl.ts @@ -32,6 +32,7 @@ import { Knex } from 'knex'; import * as permissions from './types/permissions'; import * as eventType from './types/events'; import { Db } from './db/db'; +import { defaultLockKey, defaultTimeout, withDbLock } from './util/db-lock'; async function createApp( config: IUnleashConfig, @@ -138,7 +139,18 @@ async function start(opts: IUnleashOptions = {}): Promise { logger.info('DB migration: disabled'); } else { logger.debug('DB migration: start'); - await migrateDb(config); + if (opts.flagResolver?.isEnabled('migrationLock')) { + logger.info('Running migration with lock'); + const lock = withDbLock(config.db, { + lockKey: defaultLockKey, + timeout: defaultTimeout, + logger, + }); + await lock(migrateDb)(config); + } else { + await migrateDb(config); + } + logger.debug('DB migration: end'); } } catch (err) { diff --git a/src/lib/types/experimental.ts b/src/lib/types/experimental.ts index b83d5fb2b3..22b8272d51 100644 --- a/src/lib/types/experimental.ts +++ b/src/lib/types/experimental.ts @@ -77,6 +77,7 @@ const flags = { process.env.UNLEASH_EXPERIMENTAL_OPTIMAL_304_DIFFER, false, ), + migrationLock: parseEnvVarBoolean(process.env.MIGRATION_LOCK, false), }; export const defaultExperimentalOptions: IExperimentalOptions = { diff --git a/src/lib/util/db-lock.test.ts b/src/lib/util/db-lock.test.ts new file mode 100644 index 0000000000..91c57e86a2 --- /dev/null +++ b/src/lib/util/db-lock.test.ts @@ -0,0 +1,69 @@ +import { withDbLock } from './db-lock'; +import { getDbConfig } from '../../test/e2e/helpers/database-config'; +import { IDBOption } from '../types'; +import { Logger } from '../logger'; + +test('should lock access to any action', async () => { + const lock = withDbLock(getDbConfig() as IDBOption); + + const asyncAction = (input: string) => Promise.resolve(`result: ${input}`); + + const result = await lock(asyncAction)('data'); + + expect(result).toBe('result: data'); +}); + +const ms = (millis: number) => + new Promise((resolve) => { + setTimeout(() => resolve('time'), millis); + }); + +test('should await other actions on lock', async () => { + const lock = withDbLock(getDbConfig() as IDBOption); + + const results: string[] = []; + const slowAsyncAction = (input: string) => { + return new Promise((resolve) => { + setTimeout(() => { + results.push(input); + resolve(input); + }, 200); + }); + }; + const fastAction = async (input: string) => { + results.push(input); + }; + + const lockedAction = lock(slowAsyncAction); + const lockedAnotherAction = lock(fastAction); + + // deliberately skipped await to simulate another server running slow operation + lockedAction('first'); + await ms(100); // start fast action after slow action established DB connection + await lockedAnotherAction('second'); + + await expect(results).toStrictEqual(['first', 'second']); +}); + +test('should handle lock timeout', async () => { + const timeoutMs = 1; + let loggedError = ''; + const lock = withDbLock(getDbConfig() as IDBOption, { + lockKey: 1, + timeout: timeoutMs, + logger: { + error(msg: string) { + loggedError = msg; + }, + } as unknown as Logger, + }); + + // the query should fail because of the timeout. This one is a fallback when timeout + // was not triggered in the integration test + const asyncAction = () => Promise.reject(new Error('Query read timeout')); + + await expect(lock(asyncAction)()).rejects.toStrictEqual( + new Error('Query read timeout'), + ); + expect(loggedError).toBe('Locking error: Query read timeout'); +}); diff --git a/src/lib/util/db-lock.ts b/src/lib/util/db-lock.ts new file mode 100644 index 0000000000..55562ebe54 --- /dev/null +++ b/src/lib/util/db-lock.ts @@ -0,0 +1,43 @@ +import { Client } from 'pg'; +import { IDBOption } from '../types'; +import { Logger } from '../logger'; + +export const defaultLockKey = 479341; +export const defaultTimeout = 5000; + +interface IDbLockOptions { + timeout: number; + lockKey: number; + logger: Logger; +} + +const defaultOptions: IDbLockOptions = { + timeout: defaultTimeout, + lockKey: defaultLockKey, + logger: { ...console, fatal: console.error }, +}; + +export const withDbLock = + (dbConfig: IDBOption, config = defaultOptions) => + (fn: (...args: A) => Promise) => + async (...args: A): Promise => { + const client = new Client({ + ...dbConfig, + query_timeout: config.timeout, + }); + try { + await client.connect(); + // wait to obtain a lock + await client.query('SELECT pg_advisory_lock($1)', [config.lockKey]); + const result = await fn(...args); + return result; + } catch (e) { + config.logger.error(`Locking error: ${e.message}`); + throw e; + } finally { + await client.query('SELECT pg_advisory_unlock($1)', [ + config.lockKey, + ]); + await client.end(); + } + }; diff --git a/src/test/fixtures/fake-event-store.ts b/src/test/fixtures/fake-event-store.ts index af694b289c..d5dc3b0f39 100644 --- a/src/test/fixtures/fake-event-store.ts +++ b/src/test/fixtures/fake-event-store.ts @@ -16,7 +16,7 @@ class FakeEventStore implements IEventStore { } getMaxRevisionId(): Promise { - throw new Error('Method not implemented.'); + return Promise.resolve(1); } store(event: IEvent): Promise { @@ -64,7 +64,7 @@ class FakeEventStore implements IEventStore { } async get(key: number): Promise { - return this.events.find((e) => e.id === key); + return this.events.find((e) => e.id === key)!; } async getAll(): Promise { diff --git a/yarn.lock b/yarn.lock index d2d387d7b7..75bca555bb 100644 --- a/yarn.lock +++ b/yarn.lock @@ -1271,6 +1271,15 @@ resolved "https://registry.yarnpkg.com/@types/owasp-password-strength-test/-/owasp-password-strength-test-1.3.0.tgz#f639e38847eeb0db14bf7b70896cecd4342ac571" integrity sha512-eKYl6svyRua5OVUFm+AXSxdBrKo7snzrCcFv0KoqKNvNgB3fJzRq3s/xphf+jNTllqYxgsx1uWLeAcL4MjLWQQ== +"@types/pg@8.6.6": + version "8.6.6" + resolved "https://registry.yarnpkg.com/@types/pg/-/pg-8.6.6.tgz#21cdf873a3e345a6e78f394677e3b3b1b543cb80" + integrity sha512-O2xNmXebtwVekJDD+02udOncjVcMZQuTEQEMpKJ0ZRf5E7/9JJX3izhKUcUifBkyKpljyUM6BTgy2trmviKlpw== + dependencies: + "@types/node" "*" + pg-protocol "*" + pg-types "^2.2.0" + "@types/prettier@^2.1.5": version "2.7.2" resolved "https://registry.yarnpkg.com/@types/prettier/-/prettier-2.7.2.tgz#6c2324641cc4ba050a8c710b2b251b377581fbf0" @@ -5646,12 +5655,17 @@ pg-pool@^3.5.2: resolved "https://registry.yarnpkg.com/pg-pool/-/pg-pool-3.5.2.tgz#ed1bed1fb8d79f1c6fd5fb1c99e990fbf9ddf178" integrity sha512-His3Fh17Z4eg7oANLob6ZvH8xIVen3phEZh2QuyrIl4dQSDVEabNducv6ysROKpDNPSD+12tONZVWfSgMvDD9w== +pg-protocol@*: + version "1.6.0" + resolved "https://registry.yarnpkg.com/pg-protocol/-/pg-protocol-1.6.0.tgz#4c91613c0315349363af2084608db843502f8833" + integrity sha512-M+PDm637OY5WM307051+bsDia5Xej6d9IR4GwJse1qA1DIhiKlksvrneZOYQq42OM+spubpcNYEo2FcKQrDk+Q== + pg-protocol@^1.5.0: version "1.5.0" resolved "https://registry.yarnpkg.com/pg-protocol/-/pg-protocol-1.5.0.tgz#b5dd452257314565e2d54ab3c132adc46565a6a0" integrity sha512-muRttij7H8TqRNu/DxrAJQITO4Ac7RmX3Klyr/9mJEOBeIpgnF8f9jAfRz5d3XwQZl5qBjF9gLsUtMPJE0vezQ== -pg-types@^2.1.0: +pg-types@^2.1.0, pg-types@^2.2.0: version "2.2.0" resolved "https://registry.yarnpkg.com/pg-types/-/pg-types-2.2.0.tgz#2d0250d636454f7cfa3b6ae0382fdfa8063254a3" integrity sha512-qTAAlrEsl8s4OiEQY69wDvcMIdQN6wdz5ojQiOy6YRMuynxenON0O5oCpJI6lshc6scgAY8qvJ2On/p+CXY0GA==