diff --git a/src/lib/features/scheduler/scheduler-service.test.ts b/src/lib/features/scheduler/scheduler-service.test.ts index 55f0fdd7f7..323481bc7f 100644 --- a/src/lib/features/scheduler/scheduler-service.test.ts +++ b/src/lib/features/scheduler/scheduler-service.test.ts @@ -1,10 +1,12 @@ import { SchedulerService } from './scheduler-service'; -import { LogProvider } from '../../logger'; +import { LogProvider, Logger } from '../../logger'; import MaintenanceService from '../../services/maintenance-service'; import { createTestConfig } from '../../../test/config/test-config'; import SettingService from '../../services/setting-service'; import FakeSettingStore from '../../../test/fixtures/fake-setting-store'; import EventService from '../../services/event-service'; +import { SCHEDULER_JOB_TIME } from '../../metric-events'; +import EventEmitter from 'events'; function ms(timeMs) { return new Promise((resolve) => setTimeout(resolve, timeMs)); @@ -36,18 +38,37 @@ const toggleMaintenanceMode = async ( ); }; -test('Schedules job immediately', async () => { - const config = createTestConfig(); +const createSchedulerTestService = ({ + loggerOverride, + eventBusOverride, +}: { + loggerOverride?: LogProvider; + eventBusOverride?: EventEmitter; +} = {}) => { + const config = { + ...createTestConfig(), + eventBus: eventBusOverride || new EventEmitter(), + }; + + const logger = loggerOverride || config.getLogger; + const settingStore = new FakeSettingStore(); const settingService = new SettingService({ settingStore }, config, { storeEvent() {}, } as unknown as EventService); const maintenanceService = new MaintenanceService(config, settingService); const schedulerService = new SchedulerService( - config.getLogger, + logger, maintenanceService, + config.eventBus, ); + return { schedulerService, maintenanceService }; +}; + +test('Schedules job immediately', async () => { + const { schedulerService } = createSchedulerTestService(); + const job = jest.fn(); await schedulerService.schedule(job, 10, 'test-id'); @@ -57,16 +78,8 @@ test('Schedules job immediately', async () => { }); test('Does not schedule job immediately when paused', async () => { - const config = createTestConfig(); - const settingStore = new FakeSettingStore(); - const settingService = new SettingService({ settingStore }, config, { - storeEvent() {}, - } as unknown as EventService); - const maintenanceService = new MaintenanceService(config, settingService); - const schedulerService = new SchedulerService( - config.getLogger, - maintenanceService, - ); + const { schedulerService, maintenanceService } = + createSchedulerTestService(); const job = jest.fn(); @@ -78,16 +91,7 @@ test('Does not schedule job immediately when paused', async () => { }); test('Can schedule a single regular job', async () => { - const config = createTestConfig(); - const settingStore = new FakeSettingStore(); - const settingService = new SettingService({ settingStore }, config, { - storeEvent() {}, - } as unknown as EventService); - const maintenanceService = new MaintenanceService(config, settingService); - const schedulerService = new SchedulerService( - config.getLogger, - maintenanceService, - ); + const { schedulerService } = createSchedulerTestService(); const job = jest.fn(); @@ -99,16 +103,8 @@ test('Can schedule a single regular job', async () => { }); test('Scheduled job ignored in a paused mode', async () => { - const config = createTestConfig(); - const settingStore = new FakeSettingStore(); - const settingService = new SettingService({ settingStore }, config, { - storeEvent() {}, - } as unknown as EventService); - const maintenanceService = new MaintenanceService(config, settingService); - const schedulerService = new SchedulerService( - config.getLogger, - maintenanceService, - ); + const { schedulerService, maintenanceService } = + createSchedulerTestService(); const job = jest.fn(); @@ -121,16 +117,8 @@ test('Scheduled job ignored in a paused mode', async () => { }); test('Can resume paused job', async () => { - const config = createTestConfig(); - const settingStore = new FakeSettingStore(); - const settingService = new SettingService({ settingStore }, config, { - storeEvent() {}, - } as unknown as EventService); - const maintenanceService = new MaintenanceService(config, settingService); - const schedulerService = new SchedulerService( - config.getLogger, - maintenanceService, - ); + const { schedulerService, maintenanceService } = + createSchedulerTestService(); const job = jest.fn(); @@ -144,16 +132,7 @@ test('Can resume paused job', async () => { }); test('Can schedule multiple jobs at the same interval', async () => { - const config = createTestConfig(); - const settingStore = new FakeSettingStore(); - const settingService = new SettingService({ settingStore }, config, { - storeEvent() {}, - } as unknown as EventService); - const maintenanceService = new MaintenanceService(config, settingService); - const schedulerService = new SchedulerService( - config.getLogger, - maintenanceService, - ); + const { schedulerService } = createSchedulerTestService(); const job = jest.fn(); const anotherJob = jest.fn(); @@ -168,16 +147,8 @@ test('Can schedule multiple jobs at the same interval', async () => { }); test('Can schedule multiple jobs at the different intervals', async () => { - const config = createTestConfig(); - const settingStore = new FakeSettingStore(); - const settingService = new SettingService({ settingStore }, config, { - storeEvent() {}, - } as unknown as EventService); - const maintenanceService = new MaintenanceService(config, settingService); - const schedulerService = new SchedulerService( - config.getLogger, - maintenanceService, - ); + const { schedulerService } = createSchedulerTestService(); + const job = jest.fn(); const anotherJob = jest.fn(); @@ -192,13 +163,9 @@ test('Can schedule multiple jobs at the different intervals', async () => { test('Can handle crash of a async job', async () => { const { logger, getRecords } = getLogger(); - const config = { ...createTestConfig(), logger }; - const settingStore = new FakeSettingStore(); - const settingService = new SettingService({ settingStore }, config, { - storeEvent() {}, - } as unknown as EventService); - const maintenanceService = new MaintenanceService(config, settingService); - const schedulerService = new SchedulerService(logger, maintenanceService); + const { schedulerService } = createSchedulerTestService({ + loggerOverride: logger, + }); const job = async () => { await Promise.reject('async reason'); @@ -216,13 +183,9 @@ test('Can handle crash of a async job', async () => { test('Can handle crash of a sync job', async () => { const { logger, getRecords } = getLogger(); - const config = { ...createTestConfig(), logger }; - const settingStore = new FakeSettingStore(); - const settingService = new SettingService({ settingStore }, config, { - storeEvent() {}, - } as unknown as EventService); - const maintenanceService = new MaintenanceService(config, settingService); - const schedulerService = new SchedulerService(logger, maintenanceService); + const { schedulerService } = createSchedulerTestService({ + loggerOverride: logger, + }); const job = () => { throw new Error('sync reason'); @@ -237,3 +200,49 @@ test('Can handle crash of a sync job', async () => { ['scheduled job failed | id: test-id-11 | Error: sync reason'], ]); }); + +test('Can handle crash of a async job', async () => { + const { logger, getRecords } = getLogger(); + const { schedulerService } = createSchedulerTestService({ + loggerOverride: logger, + }); + + const job = async () => { + await Promise.reject('async reason'); + }; + + await schedulerService.schedule(job, 50, 'test-id-10'); + await ms(75); + + schedulerService.stop(); + expect(getRecords()).toEqual([ + ['scheduled job failed | id: test-id-10 | async reason'], + ['scheduled job failed | id: test-id-10 | async reason'], + ]); +}); + +it('should emit scheduler job time event when scheduled function is run', async () => { + const eventBus = new EventEmitter(); + const { schedulerService } = createSchedulerTestService({ + eventBusOverride: eventBus, + }); + + const mockJob = async () => { + return Promise.resolve(); + }; + + const eventPromise = new Promise((resolve, reject) => { + eventBus.on(SCHEDULER_JOB_TIME, ({ jobId, time }) => { + try { + expect(jobId).toBe('testJobId'); + expect(typeof time).toBe('number'); + resolve(null); + } catch (e) { + reject(e); + } + }); + }); + + await schedulerService.schedule(mockJob, 50, 'testJobId'); + await eventPromise; +}); diff --git a/src/lib/features/scheduler/scheduler-service.ts b/src/lib/features/scheduler/scheduler-service.ts index 20f4a94a3c..155ef0fa13 100644 --- a/src/lib/features/scheduler/scheduler-service.ts +++ b/src/lib/features/scheduler/scheduler-service.ts @@ -1,5 +1,7 @@ +import EventEmitter from 'events'; import { Logger, LogProvider } from '../../logger'; import MaintenanceService from '../../services/maintenance-service'; +import { SCHEDULER_JOB_TIME } from '../../metric-events'; export class SchedulerService { private intervalIds: NodeJS.Timer[] = []; @@ -8,12 +10,16 @@ export class SchedulerService { private maintenanceService: MaintenanceService; + private eventBus: EventEmitter; + constructor( getLogger: LogProvider, maintenanceService: MaintenanceService, + eventBus: EventEmitter, ) { this.logger = getLogger('/services/scheduler-service.ts'); this.maintenanceService = maintenanceService; + this.eventBus = eventBus; } async schedule( @@ -21,13 +27,28 @@ export class SchedulerService { timeMs: number, id: string, ): Promise { + const runScheduledFunctionWithEvent = async () => { + const startTime = process.hrtime(); + await scheduledFunction(); + const endTime = process.hrtime(startTime); + + // Process hrtime returns a list with two numbers representing high-resolution time. + // The first number is the number of seconds, the second is the number of nanoseconds. + // Since there are 1e9 (1,000,000,000) nanoseconds in a second, endTime[1] / 1e9 converts the nanoseconds to seconds. + const durationInSeconds = endTime[0] + endTime[1] / 1e9; + this.eventBus.emit(SCHEDULER_JOB_TIME, { + jobId: id, + time: durationInSeconds, + }); + }; + this.intervalIds.push( setInterval(async () => { try { const maintenanceMode = await this.maintenanceService.isMaintenanceMode(); if (!maintenanceMode) { - await scheduledFunction(); + await runScheduledFunctionWithEvent(); } } catch (e) { this.logger.error( @@ -40,7 +61,7 @@ export class SchedulerService { const maintenanceMode = await this.maintenanceService.isMaintenanceMode(); if (!maintenanceMode) { - await scheduledFunction(); + await runScheduledFunctionWithEvent(); } } catch (e) { this.logger.error(`scheduled job failed | id: ${id} | ${e}`); diff --git a/src/lib/metric-events.ts b/src/lib/metric-events.ts index 0b9c891ee2..75d7347d79 100644 --- a/src/lib/metric-events.ts +++ b/src/lib/metric-events.ts @@ -1,4 +1,5 @@ const REQUEST_TIME = 'request_time'; const DB_TIME = 'db_time'; +const SCHEDULER_JOB_TIME = 'scheduler_job_time'; -export { REQUEST_TIME, DB_TIME }; +export { REQUEST_TIME, DB_TIME, SCHEDULER_JOB_TIME }; diff --git a/src/lib/metrics.ts b/src/lib/metrics.ts index dafc40a484..98f0bad420 100644 --- a/src/lib/metrics.ts +++ b/src/lib/metrics.ts @@ -59,6 +59,14 @@ export default class MetricsMonitor { maxAgeSeconds: 600, ageBuckets: 5, }); + const schedulerDuration = new client.Summary({ + name: 'scheduler_duration_seconds', + help: 'Scheduler duration time', + labelNames: ['jobId'], + percentiles: [0.1, 0.5, 0.9, 0.95, 0.99], + maxAgeSeconds: 600, + ageBuckets: 5, + }); const dbDuration = new client.Summary({ name: 'db_query_duration_seconds', help: 'DB query duration time', @@ -321,6 +329,10 @@ export default class MetricsMonitor { }, ); + eventBus.on(events.SCHEDULER_JOB_TIME, ({ jobId, time }) => { + schedulerDuration.labels(jobId).observe(time); + }); + eventBus.on('optimal304Differ', ({ status }) => { optimal304DiffingCounter.labels(status).inc(); }); diff --git a/src/lib/services/index.ts b/src/lib/services/index.ts index db7ef63b27..878671371f 100644 --- a/src/lib/services/index.ts +++ b/src/lib/services/index.ts @@ -290,6 +290,7 @@ export const createServices = ( const schedulerService = new SchedulerService( config.getLogger, maintenanceService, + config.eventBus, ); const eventAnnouncerService = new EventAnnouncerService(stores, config); diff --git a/src/lib/services/maintenance-service.test.ts b/src/lib/services/maintenance-service.test.ts index fc4be7e720..ddbeaf11b1 100644 --- a/src/lib/services/maintenance-service.test.ts +++ b/src/lib/services/maintenance-service.test.ts @@ -15,6 +15,7 @@ test('Scheduler should run scheduled functions if maintenance mode is off', asyn const schedulerService = new SchedulerService( config.getLogger, maintenanceService, + config.eventBus, ); const job = jest.fn(); @@ -35,6 +36,7 @@ test('Scheduler should not run scheduled functions if maintenance mode is on', a const schedulerService = new SchedulerService( config.getLogger, maintenanceService, + config.eventBus, ); await maintenanceService.toggleMaintenanceMode(