diff --git a/src/lib/features/scheduler/job-service.test.ts b/src/lib/features/scheduler/job-service.test.ts new file mode 100644 index 0000000000..a7486c826b --- /dev/null +++ b/src/lib/features/scheduler/job-service.test.ts @@ -0,0 +1,68 @@ +import { createTestConfig } from '../../../test/config/test-config'; +import { JobStore } from './job-store'; +import { JobService } from './job-service'; +import dbInit, { type ITestDb } from '../../../test/e2e/helpers/database-init'; + +let db: ITestDb; +let store: JobStore; +const config = createTestConfig(); +beforeAll(async () => { + db = await dbInit('job_service_serial', config.getLogger); + store = new JobStore(db.rawDatabase, config); +}); + +afterEach(async () => { + await store.deleteAll(); +}); + +afterAll(async () => { + await db.destroy(); +}); + +// note: this might be flaky if the test runs exactly at 59 minutes and 59 seconds of an hour and 999 milliseconds but should be unlikely +test('Only executes job once within time period', async () => { + let counter = 0; + const service = new JobService(store, config.getLogger); + const job = service.singleInstance( + 'test', + async () => { + counter++; + }, + 60, + ); + await job(); + await job(); + expect(counter).toBe(1); + const jobs = await store.getAll(); + expect(jobs).toHaveLength(1); + expect(jobs.every((j) => j.finishedAt !== null)).toBe(true); +}); + +test('Will execute jobs with different keys', async () => { + let counter = 0; + let otherCounter = 0; + const service = new JobService(store, config.getLogger); + const incrementCounter = service.singleInstance( + 'one', + async () => { + counter++; + }, + 60, + ); + const incrementOtherCounter = service.singleInstance( + 'two', + async () => { + otherCounter++; + }, + 60, + ); + await incrementCounter(); + await incrementCounter(); + await incrementOtherCounter(); + await incrementOtherCounter(); + expect(counter).toBe(1); + expect(otherCounter).toBe(1); + const jobs = await store.getAll(); + expect(jobs).toHaveLength(2); + expect(jobs.every((j) => j.finishedAt !== null)).toBe(true); +}); diff --git a/src/lib/features/scheduler/job-service.ts b/src/lib/features/scheduler/job-service.ts new file mode 100644 index 0000000000..c915e5c778 --- /dev/null +++ b/src/lib/features/scheduler/job-service.ts @@ -0,0 +1,58 @@ +import type { Logger } from '../../server-impl'; +import type { JobStore } from './job-store'; +import type { LogProvider } from '../../logger'; +import { subMinutes } from 'date-fns'; + +export class JobService { + private jobStore: JobStore; + private logger: Logger; + constructor(jobStore: JobStore, logProvider: LogProvider) { + this.jobStore = jobStore; + this.logger = logProvider('/services/job-service'); + } + + /** + * Wraps a function in a job that will guarantee the function is executed + * in a mutually exclusive way in a single instance of the cluster, at most + * once every {@param bucketSizeInMinutes}. + * + * The key identifies the job group: only one job in the group will execute + * at any given time. + * + * Note: buckets begin at the start of the time span + */ + public singleInstance( + key: string, + fn: (range?: { from: Date; to: Date }) => Promise, + bucketSizeInMinutes = 5, + ): () => Promise { + return async () => { + const acquired = await this.jobStore.acquireBucket( + key, + bucketSizeInMinutes, + ); + + if (acquired) { + const { name, bucket } = acquired; + this.logger.info( + `Acquired job lock for ${name} from >= ${subMinutes( + bucket, + bucketSizeInMinutes, + )} to < ${bucket}`, + ); + try { + const range = { + from: subMinutes(bucket, bucketSizeInMinutes), + to: bucket, + }; + return fn(range); + } finally { + await this.jobStore.update(name, bucket, { + stage: 'completed', + finishedAt: new Date(), + }); + } + } + }; + } +} diff --git a/src/lib/features/scheduler/job-store.test.ts b/src/lib/features/scheduler/job-store.test.ts new file mode 100644 index 0000000000..69cdc51f25 --- /dev/null +++ b/src/lib/features/scheduler/job-store.test.ts @@ -0,0 +1,30 @@ +import { createTestConfig } from '../../../test/config/test-config'; +import { JobStore } from './job-store'; +import dbInit, { type ITestDb } from '../../../test/e2e/helpers/database-init'; + +let db: ITestDb; +const config = createTestConfig(); +beforeAll(async () => { + db = await dbInit('job_store_serial', config.getLogger); +}); + +afterAll(async () => { + await db.destroy(); +}); + +test('cannot acquireBucket twice', async () => { + const store = new JobStore(db.rawDatabase, config); + // note: this might be flaky if the test runs exactly at 59 minutes and 59 seconds of an hour and 999 milliseconds but should be unlikely + const bucket = await store.acquireBucket('test', 60); + expect(bucket).toBeDefined(); + const bucket2 = await store.acquireBucket('test', 60); + expect(bucket2).toBeUndefined(); +}); + +test('Can acquire bucket for two different key names within the same period', async () => { + const store = new JobStore(db.rawDatabase, config); + const firstBucket = await store.acquireBucket('first', 60); + const secondBucket = await store.acquireBucket('second', 60); + expect(firstBucket).toBeDefined(); + expect(secondBucket).toBeDefined(); +}); diff --git a/src/lib/features/scheduler/job-store.ts b/src/lib/features/scheduler/job-store.ts new file mode 100644 index 0000000000..d32e5707db --- /dev/null +++ b/src/lib/features/scheduler/job-store.ts @@ -0,0 +1,110 @@ +import type { Store } from '../../types/stores/store'; +import type { Db, IUnleashConfig, Logger } from '../../server-impl'; +import metricsHelper from '../../util/metrics-helper'; +import { DB_TIME } from '../../metric-events'; +import type { Row } from '../../db/crud/row-type'; +import { defaultToRow } from '../../db/crud/default-mappings'; + +export type JobModel = { + name: string; + bucket: Date; + stage: 'started' | 'completed' | 'failed'; + finishedAt?: Date; +}; + +const TABLE = 'jobs'; +const toRow = (data: Partial) => + defaultToRow>(data); + +export class JobStore + implements Store +{ + private logger: Logger; + protected readonly timer: (action: string) => Function; + private db: Db; + + constructor( + db: Db, + config: Pick, + ) { + this.db = db; + this.logger = config.getLogger('job-store'); + this.timer = (action: string) => + metricsHelper.wrapTimer(config.eventBus, DB_TIME, { + store: TABLE, + action, + }); + } + + async acquireBucket( + key: string, + bucketLengthInMinutes: number, + ): Promise<{ name: string; bucket: Date } | undefined> { + const endTimer = this.timer('acquireBucket'); + + const bucket = await this.db>(TABLE) + .insert({ + name: key, + // note: date_floor_round is a custom function defined in the DB + bucket: this.db.raw( + `date_floor_round(now(), '${bucketLengthInMinutes} minutes')`, + ), + stage: 'started', + }) + .onConflict(['name', 'bucket']) + .ignore() + .returning(['name', 'bucket']); + + endTimer(); + return bucket[0]; + } + + async update( + name: string, + bucket: Date, + data: Partial>, + ): Promise { + const rows = await this.db>(TABLE) + .update(toRow(data)) + .where({ name, bucket }) + .returning('*'); + return rows[0]; + } + + async get(pk: { name: string; bucket: Date }): Promise { + const rows = await this.db(TABLE).where(pk); + return rows[0]; + } + + async getAll(query?: Object | undefined): Promise { + if (query) { + return this.db(TABLE).where(query); + } + return this.db(TABLE); + } + + async exists(key: { name: string; bucket: Date }): Promise { + const result = await this.db.raw( + `SELECT EXISTS(SELECT 1 FROM ${TABLE} WHERE name = ? AND bucket = ?) AS present`, + [key.name, key.bucket], + ); + const { present } = result.rows[0]; + return present; + } + + async delete(key: { name: string; bucket: Date }): Promise { + await this.db(TABLE).where(key).delete(); + } + + async deleteAll(): Promise { + return this.db(TABLE).delete(); + } + + destroy(): void {} + + async count(): Promise { + return this.db(TABLE) + .count() + .then((res) => Number(res[0].count)); + } +} diff --git a/src/lib/features/scheduler/schedule-services.ts b/src/lib/features/scheduler/schedule-services.ts index 25e417c279..e8aece5865 100644 --- a/src/lib/features/scheduler/schedule-services.ts +++ b/src/lib/features/scheduler/schedule-services.ts @@ -133,17 +133,13 @@ export const scheduleServices = async ( ); schedulerService.schedule( - () => { - clientMetricsServiceV2.bulkAdd().catch(console.error); - }, + () => clientMetricsServiceV2.bulkAdd().catch(console.error), secondsToMilliseconds(5), 'bulkAddMetrics', ); schedulerService.schedule( - () => { - clientMetricsServiceV2.clearMetrics(48).catch(console.error); - }, + () => clientMetricsServiceV2.clearMetrics(48).catch(console.error), hoursToMilliseconds(12), 'clearMetrics', ); diff --git a/src/lib/features/scheduler/scheduler-service.ts b/src/lib/features/scheduler/scheduler-service.ts index ca64a9f3a6..b95886c113 100644 --- a/src/lib/features/scheduler/scheduler-service.ts +++ b/src/lib/features/scheduler/scheduler-service.ts @@ -38,7 +38,7 @@ export class SchedulerService { } async schedule( - scheduledFunction: () => void, + scheduledFunction: () => Promise, timeMs: number, id: string, jitter = randomJitter(2 * 1000, 30 * 1000, timeMs), diff --git a/src/lib/metrics.ts b/src/lib/metrics.ts index 186a18caa8..927c1811d2 100644 --- a/src/lib/metrics.ts +++ b/src/lib/metrics.ts @@ -664,7 +664,12 @@ export default class MetricsMonitor { }); await schedulerService.schedule( - this.registerPoolMetrics.bind(this, db.client.pool, eventBus), + async () => + this.registerPoolMetrics.bind( + this, + db.client.pool, + eventBus, + ), minutesToMilliseconds(1), 'registerPoolMetrics', 0, // no jitter diff --git a/src/lib/services/index.ts b/src/lib/services/index.ts index a5d78781b2..2eafd2f7fa 100644 --- a/src/lib/services/index.ts +++ b/src/lib/services/index.ts @@ -125,6 +125,8 @@ import { createFakeProjectInsightsService, createProjectInsightsService, } from '../features/project-insights/createProjectInsightsService'; +import { JobService } from '../features/scheduler/job-service'; +import { JobStore } from '../features/scheduler/job-store'; import { FeatureLifecycleService } from '../features/feature-lifecycle/feature-lifecycle-service'; import { createFakeFeatureLifecycleService } from '../features/feature-lifecycle/createFeatureLifecycle'; @@ -350,6 +352,12 @@ export const createServices = ( const inactiveUsersService = new InactiveUsersService(stores, config, { userService, }); + + const jobService = new JobService( + new JobStore(db!, config), + config.getLogger, + ); + const { featureLifecycleService } = db ? createFeatureLifecycleService(db, config) : createFakeFeatureLifecycleService(config); @@ -413,6 +421,7 @@ export const createServices = ( featureSearchService, inactiveUsersService, projectInsightsService, + jobService, featureLifecycleService, }; }; @@ -461,5 +470,6 @@ export { ClientFeatureToggleService, FeatureSearchService, ProjectInsightsService, + JobService, FeatureLifecycleService, }; diff --git a/src/lib/types/services.ts b/src/lib/types/services.ts index 18cbb84c43..1c3690dced 100644 --- a/src/lib/types/services.ts +++ b/src/lib/types/services.ts @@ -53,6 +53,7 @@ import type { ClientFeatureToggleService } from '../features/client-feature-togg import type { FeatureSearchService } from '../features/feature-search/feature-search-service'; import type { InactiveUsersService } from '../users/inactive/inactive-users-service'; import type { ProjectInsightsService } from '../features/project-insights/project-insights-service'; +import type { JobService } from '../features/scheduler/job-service'; import type { FeatureLifecycleService } from '../features/feature-lifecycle/feature-lifecycle-service'; export interface IUnleashServices { @@ -116,5 +117,6 @@ export interface IUnleashServices { featureSearchService: FeatureSearchService; inactiveUsersService: InactiveUsersService; projectInsightsService: ProjectInsightsService; + jobService: JobService; featureLifecycleService: FeatureLifecycleService; } diff --git a/src/migrations/20240405174629-jobs.js b/src/migrations/20240405174629-jobs.js new file mode 100644 index 0000000000..5f4881ff86 --- /dev/null +++ b/src/migrations/20240405174629-jobs.js @@ -0,0 +1,37 @@ +exports.up = function (db, cb) { + db.runSql( + ` +-- this function rounds a date to the nearest interval +CREATE OR REPLACE FUNCTION date_floor_round(base_date timestamptz, round_interval interval) RETURNS timestamptz AS $BODY$ +SELECT to_timestamp( + (EXTRACT(epoch FROM $1)::integer / EXTRACT(epoch FROM $2)::integer) + * EXTRACT(epoch FROM $2)::integer +) +$BODY$ LANGUAGE SQL STABLE; + +CREATE TABLE IF NOT EXISTS jobs ( + name TEXT NOT NULL, + bucket TIMESTAMPTZ NOT NULL, + stage TEXT NOT NULL, + finished_at TIMESTAMPTZ, + PRIMARY KEY (name, bucket) +); + +CREATE INDEX IF NOT EXISTS idx_job_finished ON jobs(finished_at); +CREATE INDEX IF NOT EXISTS idx_job_stage ON jobs(stage); + `, + cb, + ); +}; + +exports.down = function (db, cb) { + db.runSql( + ` + DROP INDEX IF EXISTS idx_job_finished; + DROP INDEX IF EXISTS idx_job_stage; + DROP TABLE IF EXISTS jobs; + + `, + cb, + ); +};