mirror of
https://github.com/Unleash/unleash.git
synced 2025-01-20 00:08:02 +01:00
feat: allow schedulers to run in a single node (#6794)
## About the changes
This PR provides a service that allows a scheduled function to run in a
single instance. It's currently not in use but tests show how to wrap a
function to make it single-instance:
65b7080e05/src/lib/features/scheduler/job-service.test.ts (L26-L32)
The key `'test'` is used to identify the group and most likely should
have the same name as the scheduled job.
---------
Co-authored-by: Christopher Kolstad <chriswk@getunleash.io>
This commit is contained in:
parent
00d3490764
commit
0a2d40fb8b
68
src/lib/features/scheduler/job-service.test.ts
Normal file
68
src/lib/features/scheduler/job-service.test.ts
Normal file
@ -0,0 +1,68 @@
|
||||
import { createTestConfig } from '../../../test/config/test-config';
|
||||
import { JobStore } from './job-store';
|
||||
import { JobService } from './job-service';
|
||||
import dbInit, { type ITestDb } from '../../../test/e2e/helpers/database-init';
|
||||
|
||||
let db: ITestDb;
|
||||
let store: JobStore;
|
||||
const config = createTestConfig();
|
||||
beforeAll(async () => {
|
||||
db = await dbInit('job_service_serial', config.getLogger);
|
||||
store = new JobStore(db.rawDatabase, config);
|
||||
});
|
||||
|
||||
afterEach(async () => {
|
||||
await store.deleteAll();
|
||||
});
|
||||
|
||||
afterAll(async () => {
|
||||
await db.destroy();
|
||||
});
|
||||
|
||||
// note: this might be flaky if the test runs exactly at 59 minutes and 59 seconds of an hour and 999 milliseconds but should be unlikely
|
||||
test('Only executes job once within time period', async () => {
|
||||
let counter = 0;
|
||||
const service = new JobService(store, config.getLogger);
|
||||
const job = service.singleInstance(
|
||||
'test',
|
||||
async () => {
|
||||
counter++;
|
||||
},
|
||||
60,
|
||||
);
|
||||
await job();
|
||||
await job();
|
||||
expect(counter).toBe(1);
|
||||
const jobs = await store.getAll();
|
||||
expect(jobs).toHaveLength(1);
|
||||
expect(jobs.every((j) => j.finishedAt !== null)).toBe(true);
|
||||
});
|
||||
|
||||
test('Will execute jobs with different keys', async () => {
|
||||
let counter = 0;
|
||||
let otherCounter = 0;
|
||||
const service = new JobService(store, config.getLogger);
|
||||
const incrementCounter = service.singleInstance(
|
||||
'one',
|
||||
async () => {
|
||||
counter++;
|
||||
},
|
||||
60,
|
||||
);
|
||||
const incrementOtherCounter = service.singleInstance(
|
||||
'two',
|
||||
async () => {
|
||||
otherCounter++;
|
||||
},
|
||||
60,
|
||||
);
|
||||
await incrementCounter();
|
||||
await incrementCounter();
|
||||
await incrementOtherCounter();
|
||||
await incrementOtherCounter();
|
||||
expect(counter).toBe(1);
|
||||
expect(otherCounter).toBe(1);
|
||||
const jobs = await store.getAll();
|
||||
expect(jobs).toHaveLength(2);
|
||||
expect(jobs.every((j) => j.finishedAt !== null)).toBe(true);
|
||||
});
|
58
src/lib/features/scheduler/job-service.ts
Normal file
58
src/lib/features/scheduler/job-service.ts
Normal file
@ -0,0 +1,58 @@
|
||||
import type { Logger } from '../../server-impl';
|
||||
import type { JobStore } from './job-store';
|
||||
import type { LogProvider } from '../../logger';
|
||||
import { subMinutes } from 'date-fns';
|
||||
|
||||
export class JobService {
|
||||
private jobStore: JobStore;
|
||||
private logger: Logger;
|
||||
constructor(jobStore: JobStore, logProvider: LogProvider) {
|
||||
this.jobStore = jobStore;
|
||||
this.logger = logProvider('/services/job-service');
|
||||
}
|
||||
|
||||
/**
|
||||
* Wraps a function in a job that will guarantee the function is executed
|
||||
* in a mutually exclusive way in a single instance of the cluster, at most
|
||||
* once every {@param bucketSizeInMinutes}.
|
||||
*
|
||||
* The key identifies the job group: only one job in the group will execute
|
||||
* at any given time.
|
||||
*
|
||||
* Note: buckets begin at the start of the time span
|
||||
*/
|
||||
public singleInstance(
|
||||
key: string,
|
||||
fn: (range?: { from: Date; to: Date }) => Promise<unknown>,
|
||||
bucketSizeInMinutes = 5,
|
||||
): () => Promise<unknown> {
|
||||
return async () => {
|
||||
const acquired = await this.jobStore.acquireBucket(
|
||||
key,
|
||||
bucketSizeInMinutes,
|
||||
);
|
||||
|
||||
if (acquired) {
|
||||
const { name, bucket } = acquired;
|
||||
this.logger.info(
|
||||
`Acquired job lock for ${name} from >= ${subMinutes(
|
||||
bucket,
|
||||
bucketSizeInMinutes,
|
||||
)} to < ${bucket}`,
|
||||
);
|
||||
try {
|
||||
const range = {
|
||||
from: subMinutes(bucket, bucketSizeInMinutes),
|
||||
to: bucket,
|
||||
};
|
||||
return fn(range);
|
||||
} finally {
|
||||
await this.jobStore.update(name, bucket, {
|
||||
stage: 'completed',
|
||||
finishedAt: new Date(),
|
||||
});
|
||||
}
|
||||
}
|
||||
};
|
||||
}
|
||||
}
|
30
src/lib/features/scheduler/job-store.test.ts
Normal file
30
src/lib/features/scheduler/job-store.test.ts
Normal file
@ -0,0 +1,30 @@
|
||||
import { createTestConfig } from '../../../test/config/test-config';
|
||||
import { JobStore } from './job-store';
|
||||
import dbInit, { type ITestDb } from '../../../test/e2e/helpers/database-init';
|
||||
|
||||
let db: ITestDb;
|
||||
const config = createTestConfig();
|
||||
beforeAll(async () => {
|
||||
db = await dbInit('job_store_serial', config.getLogger);
|
||||
});
|
||||
|
||||
afterAll(async () => {
|
||||
await db.destroy();
|
||||
});
|
||||
|
||||
test('cannot acquireBucket twice', async () => {
|
||||
const store = new JobStore(db.rawDatabase, config);
|
||||
// note: this might be flaky if the test runs exactly at 59 minutes and 59 seconds of an hour and 999 milliseconds but should be unlikely
|
||||
const bucket = await store.acquireBucket('test', 60);
|
||||
expect(bucket).toBeDefined();
|
||||
const bucket2 = await store.acquireBucket('test', 60);
|
||||
expect(bucket2).toBeUndefined();
|
||||
});
|
||||
|
||||
test('Can acquire bucket for two different key names within the same period', async () => {
|
||||
const store = new JobStore(db.rawDatabase, config);
|
||||
const firstBucket = await store.acquireBucket('first', 60);
|
||||
const secondBucket = await store.acquireBucket('second', 60);
|
||||
expect(firstBucket).toBeDefined();
|
||||
expect(secondBucket).toBeDefined();
|
||||
});
|
110
src/lib/features/scheduler/job-store.ts
Normal file
110
src/lib/features/scheduler/job-store.ts
Normal file
@ -0,0 +1,110 @@
|
||||
import type { Store } from '../../types/stores/store';
|
||||
import type { Db, IUnleashConfig, Logger } from '../../server-impl';
|
||||
import metricsHelper from '../../util/metrics-helper';
|
||||
import { DB_TIME } from '../../metric-events';
|
||||
import type { Row } from '../../db/crud/row-type';
|
||||
import { defaultToRow } from '../../db/crud/default-mappings';
|
||||
|
||||
export type JobModel = {
|
||||
name: string;
|
||||
bucket: Date;
|
||||
stage: 'started' | 'completed' | 'failed';
|
||||
finishedAt?: Date;
|
||||
};
|
||||
|
||||
const TABLE = 'jobs';
|
||||
const toRow = (data: Partial<JobModel>) =>
|
||||
defaultToRow<JobModel, Row<JobModel>>(data);
|
||||
|
||||
export class JobStore
|
||||
implements Store<JobModel, { name: string; bucket: Date }>
|
||||
{
|
||||
private logger: Logger;
|
||||
protected readonly timer: (action: string) => Function;
|
||||
private db: Db;
|
||||
|
||||
constructor(
|
||||
db: Db,
|
||||
config: Pick<IUnleashConfig, 'eventBus' | 'getLogger'>,
|
||||
) {
|
||||
this.db = db;
|
||||
this.logger = config.getLogger('job-store');
|
||||
this.timer = (action: string) =>
|
||||
metricsHelper.wrapTimer(config.eventBus, DB_TIME, {
|
||||
store: TABLE,
|
||||
action,
|
||||
});
|
||||
}
|
||||
|
||||
async acquireBucket(
|
||||
key: string,
|
||||
bucketLengthInMinutes: number,
|
||||
): Promise<{ name: string; bucket: Date } | undefined> {
|
||||
const endTimer = this.timer('acquireBucket');
|
||||
|
||||
const bucket = await this.db<Row<JobModel>>(TABLE)
|
||||
.insert({
|
||||
name: key,
|
||||
// note: date_floor_round is a custom function defined in the DB
|
||||
bucket: this.db.raw(
|
||||
`date_floor_round(now(), '${bucketLengthInMinutes} minutes')`,
|
||||
),
|
||||
stage: 'started',
|
||||
})
|
||||
.onConflict(['name', 'bucket'])
|
||||
.ignore()
|
||||
.returning(['name', 'bucket']);
|
||||
|
||||
endTimer();
|
||||
return bucket[0];
|
||||
}
|
||||
|
||||
async update(
|
||||
name: string,
|
||||
bucket: Date,
|
||||
data: Partial<Omit<JobModel, 'name' | 'bucket'>>,
|
||||
): Promise<JobModel> {
|
||||
const rows = await this.db<Row<JobModel>>(TABLE)
|
||||
.update(toRow(data))
|
||||
.where({ name, bucket })
|
||||
.returning('*');
|
||||
return rows[0];
|
||||
}
|
||||
|
||||
async get(pk: { name: string; bucket: Date }): Promise<JobModel> {
|
||||
const rows = await this.db(TABLE).where(pk);
|
||||
return rows[0];
|
||||
}
|
||||
|
||||
async getAll(query?: Object | undefined): Promise<JobModel[]> {
|
||||
if (query) {
|
||||
return this.db(TABLE).where(query);
|
||||
}
|
||||
return this.db(TABLE);
|
||||
}
|
||||
|
||||
async exists(key: { name: string; bucket: Date }): Promise<boolean> {
|
||||
const result = await this.db.raw(
|
||||
`SELECT EXISTS(SELECT 1 FROM ${TABLE} WHERE name = ? AND bucket = ?) AS present`,
|
||||
[key.name, key.bucket],
|
||||
);
|
||||
const { present } = result.rows[0];
|
||||
return present;
|
||||
}
|
||||
|
||||
async delete(key: { name: string; bucket: Date }): Promise<void> {
|
||||
await this.db(TABLE).where(key).delete();
|
||||
}
|
||||
|
||||
async deleteAll(): Promise<void> {
|
||||
return this.db(TABLE).delete();
|
||||
}
|
||||
|
||||
destroy(): void {}
|
||||
|
||||
async count(): Promise<number> {
|
||||
return this.db(TABLE)
|
||||
.count()
|
||||
.then((res) => Number(res[0].count));
|
||||
}
|
||||
}
|
@ -133,17 +133,13 @@ export const scheduleServices = async (
|
||||
);
|
||||
|
||||
schedulerService.schedule(
|
||||
() => {
|
||||
clientMetricsServiceV2.bulkAdd().catch(console.error);
|
||||
},
|
||||
() => clientMetricsServiceV2.bulkAdd().catch(console.error),
|
||||
secondsToMilliseconds(5),
|
||||
'bulkAddMetrics',
|
||||
);
|
||||
|
||||
schedulerService.schedule(
|
||||
() => {
|
||||
clientMetricsServiceV2.clearMetrics(48).catch(console.error);
|
||||
},
|
||||
() => clientMetricsServiceV2.clearMetrics(48).catch(console.error),
|
||||
hoursToMilliseconds(12),
|
||||
'clearMetrics',
|
||||
);
|
||||
|
@ -38,7 +38,7 @@ export class SchedulerService {
|
||||
}
|
||||
|
||||
async schedule(
|
||||
scheduledFunction: () => void,
|
||||
scheduledFunction: () => Promise<unknown>,
|
||||
timeMs: number,
|
||||
id: string,
|
||||
jitter = randomJitter(2 * 1000, 30 * 1000, timeMs),
|
||||
|
@ -664,7 +664,12 @@ export default class MetricsMonitor {
|
||||
});
|
||||
|
||||
await schedulerService.schedule(
|
||||
this.registerPoolMetrics.bind(this, db.client.pool, eventBus),
|
||||
async () =>
|
||||
this.registerPoolMetrics.bind(
|
||||
this,
|
||||
db.client.pool,
|
||||
eventBus,
|
||||
),
|
||||
minutesToMilliseconds(1),
|
||||
'registerPoolMetrics',
|
||||
0, // no jitter
|
||||
|
@ -125,6 +125,8 @@ import {
|
||||
createFakeProjectInsightsService,
|
||||
createProjectInsightsService,
|
||||
} from '../features/project-insights/createProjectInsightsService';
|
||||
import { JobService } from '../features/scheduler/job-service';
|
||||
import { JobStore } from '../features/scheduler/job-store';
|
||||
import { FeatureLifecycleService } from '../features/feature-lifecycle/feature-lifecycle-service';
|
||||
import { createFakeFeatureLifecycleService } from '../features/feature-lifecycle/createFeatureLifecycle';
|
||||
|
||||
@ -350,6 +352,12 @@ export const createServices = (
|
||||
const inactiveUsersService = new InactiveUsersService(stores, config, {
|
||||
userService,
|
||||
});
|
||||
|
||||
const jobService = new JobService(
|
||||
new JobStore(db!, config),
|
||||
config.getLogger,
|
||||
);
|
||||
|
||||
const { featureLifecycleService } = db
|
||||
? createFeatureLifecycleService(db, config)
|
||||
: createFakeFeatureLifecycleService(config);
|
||||
@ -413,6 +421,7 @@ export const createServices = (
|
||||
featureSearchService,
|
||||
inactiveUsersService,
|
||||
projectInsightsService,
|
||||
jobService,
|
||||
featureLifecycleService,
|
||||
};
|
||||
};
|
||||
@ -461,5 +470,6 @@ export {
|
||||
ClientFeatureToggleService,
|
||||
FeatureSearchService,
|
||||
ProjectInsightsService,
|
||||
JobService,
|
||||
FeatureLifecycleService,
|
||||
};
|
||||
|
@ -53,6 +53,7 @@ import type { ClientFeatureToggleService } from '../features/client-feature-togg
|
||||
import type { FeatureSearchService } from '../features/feature-search/feature-search-service';
|
||||
import type { InactiveUsersService } from '../users/inactive/inactive-users-service';
|
||||
import type { ProjectInsightsService } from '../features/project-insights/project-insights-service';
|
||||
import type { JobService } from '../features/scheduler/job-service';
|
||||
import type { FeatureLifecycleService } from '../features/feature-lifecycle/feature-lifecycle-service';
|
||||
|
||||
export interface IUnleashServices {
|
||||
@ -116,5 +117,6 @@ export interface IUnleashServices {
|
||||
featureSearchService: FeatureSearchService;
|
||||
inactiveUsersService: InactiveUsersService;
|
||||
projectInsightsService: ProjectInsightsService;
|
||||
jobService: JobService;
|
||||
featureLifecycleService: FeatureLifecycleService;
|
||||
}
|
||||
|
37
src/migrations/20240405174629-jobs.js
Normal file
37
src/migrations/20240405174629-jobs.js
Normal file
@ -0,0 +1,37 @@
|
||||
exports.up = function (db, cb) {
|
||||
db.runSql(
|
||||
`
|
||||
-- this function rounds a date to the nearest interval
|
||||
CREATE OR REPLACE FUNCTION date_floor_round(base_date timestamptz, round_interval interval) RETURNS timestamptz AS $BODY$
|
||||
SELECT to_timestamp(
|
||||
(EXTRACT(epoch FROM $1)::integer / EXTRACT(epoch FROM $2)::integer)
|
||||
* EXTRACT(epoch FROM $2)::integer
|
||||
)
|
||||
$BODY$ LANGUAGE SQL STABLE;
|
||||
|
||||
CREATE TABLE IF NOT EXISTS jobs (
|
||||
name TEXT NOT NULL,
|
||||
bucket TIMESTAMPTZ NOT NULL,
|
||||
stage TEXT NOT NULL,
|
||||
finished_at TIMESTAMPTZ,
|
||||
PRIMARY KEY (name, bucket)
|
||||
);
|
||||
|
||||
CREATE INDEX IF NOT EXISTS idx_job_finished ON jobs(finished_at);
|
||||
CREATE INDEX IF NOT EXISTS idx_job_stage ON jobs(stage);
|
||||
`,
|
||||
cb,
|
||||
);
|
||||
};
|
||||
|
||||
exports.down = function (db, cb) {
|
||||
db.runSql(
|
||||
`
|
||||
DROP INDEX IF EXISTS idx_job_finished;
|
||||
DROP INDEX IF EXISTS idx_job_stage;
|
||||
DROP TABLE IF EXISTS jobs;
|
||||
|
||||
`,
|
||||
cb,
|
||||
);
|
||||
};
|
Loading…
Reference in New Issue
Block a user