mirror of
https://github.com/Unleash/unleash.git
synced 2025-02-09 00:18:00 +01:00
feat: scheduler overrun protection (#6082)
This commit is contained in:
parent
73322f12f7
commit
f298d7d511
@ -69,10 +69,11 @@ const createSchedulerTestService = ({
|
|||||||
|
|
||||||
test('Schedules job immediately', async () => {
|
test('Schedules job immediately', async () => {
|
||||||
const { schedulerService } = createSchedulerTestService();
|
const { schedulerService } = createSchedulerTestService();
|
||||||
|
const NO_JITTER = 0;
|
||||||
|
|
||||||
const job = jest.fn();
|
const job = jest.fn();
|
||||||
|
|
||||||
await schedulerService.schedule(job, 10, 'test-id', 0);
|
await schedulerService.schedule(job, 10, 'test-id', NO_JITTER);
|
||||||
|
|
||||||
expect(job).toBeCalledTimes(1);
|
expect(job).toBeCalledTimes(1);
|
||||||
schedulerService.stop();
|
schedulerService.stop();
|
||||||
@ -262,3 +263,23 @@ test('Does not apply jitter if schedule interval is smaller than max jitter', as
|
|||||||
|
|
||||||
schedulerService.stop();
|
schedulerService.stop();
|
||||||
});
|
});
|
||||||
|
|
||||||
|
test('Does not allow to run scheduled job when it is already pending', async () => {
|
||||||
|
const { schedulerService } = createSchedulerTestService();
|
||||||
|
const NO_JITTER = 0;
|
||||||
|
|
||||||
|
const job = jest.fn();
|
||||||
|
const slowJob = async () => {
|
||||||
|
job();
|
||||||
|
await ms(25);
|
||||||
|
};
|
||||||
|
|
||||||
|
void schedulerService.schedule(slowJob, 10, 'test-id', NO_JITTER);
|
||||||
|
|
||||||
|
// scheduler had 2 chances to run but the initial slowJob was pending
|
||||||
|
await ms(25);
|
||||||
|
|
||||||
|
expect(job).toBeCalledTimes(1);
|
||||||
|
|
||||||
|
schedulerService.stop();
|
||||||
|
});
|
||||||
|
@ -25,6 +25,8 @@ export class SchedulerService {
|
|||||||
|
|
||||||
private eventBus: EventEmitter;
|
private eventBus: EventEmitter;
|
||||||
|
|
||||||
|
private executingSchedulers: Set<string> = new Set();
|
||||||
|
|
||||||
constructor(
|
constructor(
|
||||||
getLogger: LogProvider,
|
getLogger: LogProvider,
|
||||||
maintenanceStatus: IMaintenanceStatus,
|
maintenanceStatus: IMaintenanceStatus,
|
||||||
@ -42,20 +44,31 @@ export class SchedulerService {
|
|||||||
jitter = randomJitter(2 * 1000, 30 * 1000, timeMs),
|
jitter = randomJitter(2 * 1000, 30 * 1000, timeMs),
|
||||||
): Promise<void> {
|
): Promise<void> {
|
||||||
const runScheduledFunctionWithEvent = async () => {
|
const runScheduledFunctionWithEvent = async () => {
|
||||||
const startTime = process.hrtime();
|
if (this.executingSchedulers.has(id)) {
|
||||||
await scheduledFunction();
|
// If the job is already executing, don't start another
|
||||||
const endTime = process.hrtime(startTime);
|
return;
|
||||||
|
}
|
||||||
|
try {
|
||||||
|
this.executingSchedulers.add(id);
|
||||||
|
|
||||||
// Process hrtime returns a list with two numbers representing high-resolution time.
|
const startTime = process.hrtime();
|
||||||
// The first number is the number of seconds, the second is the number of nanoseconds.
|
await scheduledFunction();
|
||||||
// Since there are 1e9 (1,000,000,000) nanoseconds in a second, endTime[1] / 1e9 converts the nanoseconds to seconds.
|
const endTime = process.hrtime(startTime);
|
||||||
const durationInSeconds = endTime[0] + endTime[1] / 1e9;
|
|
||||||
this.eventBus.emit(SCHEDULER_JOB_TIME, {
|
// Process hrtime returns a list with two numbers representing high-resolution time.
|
||||||
jobId: id,
|
// The first number is the number of seconds, the second is the number of nanoseconds.
|
||||||
time: durationInSeconds,
|
// Since there are 1e9 (1,000,000,000) nanoseconds in a second, endTime[1] / 1e9 converts the nanoseconds to seconds.
|
||||||
});
|
const durationInSeconds = endTime[0] + endTime[1] / 1e9;
|
||||||
|
this.eventBus.emit(SCHEDULER_JOB_TIME, {
|
||||||
|
jobId: id,
|
||||||
|
time: durationInSeconds,
|
||||||
|
});
|
||||||
|
} finally {
|
||||||
|
this.executingSchedulers.delete(id);
|
||||||
|
}
|
||||||
};
|
};
|
||||||
|
|
||||||
|
// scheduled run
|
||||||
this.intervalIds.push(
|
this.intervalIds.push(
|
||||||
setInterval(async () => {
|
setInterval(async () => {
|
||||||
try {
|
try {
|
||||||
@ -72,6 +85,8 @@ export class SchedulerService {
|
|||||||
}
|
}
|
||||||
}, timeMs).unref(),
|
}, timeMs).unref(),
|
||||||
);
|
);
|
||||||
|
|
||||||
|
// initial run with jitter
|
||||||
try {
|
try {
|
||||||
const maintenanceMode =
|
const maintenanceMode =
|
||||||
await this.maintenanceStatus.isMaintenanceMode();
|
await this.maintenanceStatus.isMaintenanceMode();
|
||||||
@ -92,5 +107,6 @@ export class SchedulerService {
|
|||||||
|
|
||||||
stop(): void {
|
stop(): void {
|
||||||
this.intervalIds.forEach(clearInterval);
|
this.intervalIds.forEach(clearInterval);
|
||||||
|
this.executingSchedulers.clear();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
Loading…
Reference in New Issue
Block a user