diff --git a/src/lib/features/scheduler/scheduler-service.test.ts b/src/lib/features/scheduler/scheduler-service.test.ts index 6d2ecaa952..8e894ed573 100644 --- a/src/lib/features/scheduler/scheduler-service.test.ts +++ b/src/lib/features/scheduler/scheduler-service.test.ts @@ -69,10 +69,11 @@ const createSchedulerTestService = ({ test('Schedules job immediately', async () => { const { schedulerService } = createSchedulerTestService(); + const NO_JITTER = 0; const job = jest.fn(); - await schedulerService.schedule(job, 10, 'test-id', 0); + await schedulerService.schedule(job, 10, 'test-id', NO_JITTER); expect(job).toBeCalledTimes(1); schedulerService.stop(); @@ -262,3 +263,23 @@ test('Does not apply jitter if schedule interval is smaller than max jitter', as schedulerService.stop(); }); + +test('Does not allow to run scheduled job when it is already pending', async () => { + const { schedulerService } = createSchedulerTestService(); + const NO_JITTER = 0; + + const job = jest.fn(); + const slowJob = async () => { + job(); + await ms(25); + }; + + void schedulerService.schedule(slowJob, 10, 'test-id', NO_JITTER); + + // scheduler had 2 chances to run but the initial slowJob was pending + await ms(25); + + expect(job).toBeCalledTimes(1); + + schedulerService.stop(); +}); diff --git a/src/lib/features/scheduler/scheduler-service.ts b/src/lib/features/scheduler/scheduler-service.ts index aa45dbe8e1..c4511e3ab0 100644 --- a/src/lib/features/scheduler/scheduler-service.ts +++ b/src/lib/features/scheduler/scheduler-service.ts @@ -25,6 +25,8 @@ export class SchedulerService { private eventBus: EventEmitter; + private executingSchedulers: Set = new Set(); + constructor( getLogger: LogProvider, maintenanceStatus: IMaintenanceStatus, @@ -42,20 +44,31 @@ export class SchedulerService { jitter = randomJitter(2 * 1000, 30 * 1000, timeMs), ): Promise { const runScheduledFunctionWithEvent = async () => { - const startTime = process.hrtime(); - await scheduledFunction(); - const endTime = process.hrtime(startTime); + if (this.executingSchedulers.has(id)) { + // If the job is already executing, don't start another + return; + } + try { + this.executingSchedulers.add(id); - // Process hrtime returns a list with two numbers representing high-resolution time. - // The first number is the number of seconds, the second is the number of nanoseconds. - // Since there are 1e9 (1,000,000,000) nanoseconds in a second, endTime[1] / 1e9 converts the nanoseconds to seconds. - const durationInSeconds = endTime[0] + endTime[1] / 1e9; - this.eventBus.emit(SCHEDULER_JOB_TIME, { - jobId: id, - time: durationInSeconds, - }); + const startTime = process.hrtime(); + await scheduledFunction(); + const endTime = process.hrtime(startTime); + + // Process hrtime returns a list with two numbers representing high-resolution time. + // The first number is the number of seconds, the second is the number of nanoseconds. + // Since there are 1e9 (1,000,000,000) nanoseconds in a second, endTime[1] / 1e9 converts the nanoseconds to seconds. + const durationInSeconds = endTime[0] + endTime[1] / 1e9; + this.eventBus.emit(SCHEDULER_JOB_TIME, { + jobId: id, + time: durationInSeconds, + }); + } finally { + this.executingSchedulers.delete(id); + } }; + // scheduled run this.intervalIds.push( setInterval(async () => { try { @@ -72,6 +85,8 @@ export class SchedulerService { } }, timeMs).unref(), ); + + // initial run with jitter try { const maintenanceMode = await this.maintenanceStatus.isMaintenanceMode(); @@ -92,5 +107,6 @@ export class SchedulerService { stop(): void { this.intervalIds.forEach(clearInterval); + this.executingSchedulers.clear(); } }