mirror of
				https://github.com/Unleash/unleash.git
				synced 2025-10-27 11:02:16 +01:00 
			
		
		
		
	feat: scheduled functions observability (#5377)
See linear issue: https://linear.app/unleash/issue/1-1656/add-scheduler-observability As per post mortem actions, we are adding observability to scheduled functions. This PR adds prometheus observability to our scheduled functions via a summary. In addition to timing these functions with the more accurate process.hrtime, we are getting a counter for free per scheduled job id. Prometheus example: <img width="731" alt="Skjermbilde 2023-11-21 kl 13 36 33" src="https://github.com/Unleash/unleash/assets/16081982/08a2064d-5152-4b4f-8a08-eb06e726757a">
This commit is contained in:
		
							parent
							
								
									f8db9098fc
								
							
						
					
					
						commit
						fe4bb4f227
					
				| @ -1,10 +1,12 @@ | ||||
| import { SchedulerService } from './scheduler-service'; | ||||
| import { LogProvider } from '../../logger'; | ||||
| import { LogProvider, Logger } from '../../logger'; | ||||
| import MaintenanceService from '../../services/maintenance-service'; | ||||
| import { createTestConfig } from '../../../test/config/test-config'; | ||||
| import SettingService from '../../services/setting-service'; | ||||
| import FakeSettingStore from '../../../test/fixtures/fake-setting-store'; | ||||
| import EventService from '../../services/event-service'; | ||||
| import { SCHEDULER_JOB_TIME } from '../../metric-events'; | ||||
| import EventEmitter from 'events'; | ||||
| 
 | ||||
| function ms(timeMs) { | ||||
|     return new Promise((resolve) => setTimeout(resolve, timeMs)); | ||||
| @ -36,18 +38,37 @@ const toggleMaintenanceMode = async ( | ||||
|     ); | ||||
| }; | ||||
| 
 | ||||
| test('Schedules job immediately', async () => { | ||||
|     const config = createTestConfig(); | ||||
| const createSchedulerTestService = ({ | ||||
|     loggerOverride, | ||||
|     eventBusOverride, | ||||
| }: { | ||||
|     loggerOverride?: LogProvider; | ||||
|     eventBusOverride?: EventEmitter; | ||||
| } = {}) => { | ||||
|     const config = { | ||||
|         ...createTestConfig(), | ||||
|         eventBus: eventBusOverride || new EventEmitter(), | ||||
|     }; | ||||
| 
 | ||||
|     const logger = loggerOverride || config.getLogger; | ||||
| 
 | ||||
|     const settingStore = new FakeSettingStore(); | ||||
|     const settingService = new SettingService({ settingStore }, config, { | ||||
|         storeEvent() {}, | ||||
|     } as unknown as EventService); | ||||
|     const maintenanceService = new MaintenanceService(config, settingService); | ||||
|     const schedulerService = new SchedulerService( | ||||
|         config.getLogger, | ||||
|         logger, | ||||
|         maintenanceService, | ||||
|         config.eventBus, | ||||
|     ); | ||||
| 
 | ||||
|     return { schedulerService, maintenanceService }; | ||||
| }; | ||||
| 
 | ||||
| test('Schedules job immediately', async () => { | ||||
|     const { schedulerService } = createSchedulerTestService(); | ||||
| 
 | ||||
|     const job = jest.fn(); | ||||
| 
 | ||||
|     await schedulerService.schedule(job, 10, 'test-id'); | ||||
| @ -57,16 +78,8 @@ test('Schedules job immediately', async () => { | ||||
| }); | ||||
| 
 | ||||
| test('Does not schedule job immediately when paused', async () => { | ||||
|     const config = createTestConfig(); | ||||
|     const settingStore = new FakeSettingStore(); | ||||
|     const settingService = new SettingService({ settingStore }, config, { | ||||
|         storeEvent() {}, | ||||
|     } as unknown as EventService); | ||||
|     const maintenanceService = new MaintenanceService(config, settingService); | ||||
|     const schedulerService = new SchedulerService( | ||||
|         config.getLogger, | ||||
|         maintenanceService, | ||||
|     ); | ||||
|     const { schedulerService, maintenanceService } = | ||||
|         createSchedulerTestService(); | ||||
| 
 | ||||
|     const job = jest.fn(); | ||||
| 
 | ||||
| @ -78,16 +91,7 @@ test('Does not schedule job immediately when paused', async () => { | ||||
| }); | ||||
| 
 | ||||
| test('Can schedule a single regular job', async () => { | ||||
|     const config = createTestConfig(); | ||||
|     const settingStore = new FakeSettingStore(); | ||||
|     const settingService = new SettingService({ settingStore }, config, { | ||||
|         storeEvent() {}, | ||||
|     } as unknown as EventService); | ||||
|     const maintenanceService = new MaintenanceService(config, settingService); | ||||
|     const schedulerService = new SchedulerService( | ||||
|         config.getLogger, | ||||
|         maintenanceService, | ||||
|     ); | ||||
|     const { schedulerService } = createSchedulerTestService(); | ||||
| 
 | ||||
|     const job = jest.fn(); | ||||
| 
 | ||||
| @ -99,16 +103,8 @@ test('Can schedule a single regular job', async () => { | ||||
| }); | ||||
| 
 | ||||
| test('Scheduled job ignored in a paused mode', async () => { | ||||
|     const config = createTestConfig(); | ||||
|     const settingStore = new FakeSettingStore(); | ||||
|     const settingService = new SettingService({ settingStore }, config, { | ||||
|         storeEvent() {}, | ||||
|     } as unknown as EventService); | ||||
|     const maintenanceService = new MaintenanceService(config, settingService); | ||||
|     const schedulerService = new SchedulerService( | ||||
|         config.getLogger, | ||||
|         maintenanceService, | ||||
|     ); | ||||
|     const { schedulerService, maintenanceService } = | ||||
|         createSchedulerTestService(); | ||||
| 
 | ||||
|     const job = jest.fn(); | ||||
| 
 | ||||
| @ -121,16 +117,8 @@ test('Scheduled job ignored in a paused mode', async () => { | ||||
| }); | ||||
| 
 | ||||
| test('Can resume paused job', async () => { | ||||
|     const config = createTestConfig(); | ||||
|     const settingStore = new FakeSettingStore(); | ||||
|     const settingService = new SettingService({ settingStore }, config, { | ||||
|         storeEvent() {}, | ||||
|     } as unknown as EventService); | ||||
|     const maintenanceService = new MaintenanceService(config, settingService); | ||||
|     const schedulerService = new SchedulerService( | ||||
|         config.getLogger, | ||||
|         maintenanceService, | ||||
|     ); | ||||
|     const { schedulerService, maintenanceService } = | ||||
|         createSchedulerTestService(); | ||||
| 
 | ||||
|     const job = jest.fn(); | ||||
| 
 | ||||
| @ -144,16 +132,7 @@ test('Can resume paused job', async () => { | ||||
| }); | ||||
| 
 | ||||
| test('Can schedule multiple jobs at the same interval', async () => { | ||||
|     const config = createTestConfig(); | ||||
|     const settingStore = new FakeSettingStore(); | ||||
|     const settingService = new SettingService({ settingStore }, config, { | ||||
|         storeEvent() {}, | ||||
|     } as unknown as EventService); | ||||
|     const maintenanceService = new MaintenanceService(config, settingService); | ||||
|     const schedulerService = new SchedulerService( | ||||
|         config.getLogger, | ||||
|         maintenanceService, | ||||
|     ); | ||||
|     const { schedulerService } = createSchedulerTestService(); | ||||
| 
 | ||||
|     const job = jest.fn(); | ||||
|     const anotherJob = jest.fn(); | ||||
| @ -168,16 +147,8 @@ test('Can schedule multiple jobs at the same interval', async () => { | ||||
| }); | ||||
| 
 | ||||
| test('Can schedule multiple jobs at the different intervals', async () => { | ||||
|     const config = createTestConfig(); | ||||
|     const settingStore = new FakeSettingStore(); | ||||
|     const settingService = new SettingService({ settingStore }, config, { | ||||
|         storeEvent() {}, | ||||
|     } as unknown as EventService); | ||||
|     const maintenanceService = new MaintenanceService(config, settingService); | ||||
|     const schedulerService = new SchedulerService( | ||||
|         config.getLogger, | ||||
|         maintenanceService, | ||||
|     ); | ||||
|     const { schedulerService } = createSchedulerTestService(); | ||||
| 
 | ||||
|     const job = jest.fn(); | ||||
|     const anotherJob = jest.fn(); | ||||
| 
 | ||||
| @ -192,13 +163,9 @@ test('Can schedule multiple jobs at the different intervals', async () => { | ||||
| 
 | ||||
| test('Can handle crash of a async job', async () => { | ||||
|     const { logger, getRecords } = getLogger(); | ||||
|     const config = { ...createTestConfig(), logger }; | ||||
|     const settingStore = new FakeSettingStore(); | ||||
|     const settingService = new SettingService({ settingStore }, config, { | ||||
|         storeEvent() {}, | ||||
|     } as unknown as EventService); | ||||
|     const maintenanceService = new MaintenanceService(config, settingService); | ||||
|     const schedulerService = new SchedulerService(logger, maintenanceService); | ||||
|     const { schedulerService } = createSchedulerTestService({ | ||||
|         loggerOverride: logger, | ||||
|     }); | ||||
| 
 | ||||
|     const job = async () => { | ||||
|         await Promise.reject('async reason'); | ||||
| @ -216,13 +183,9 @@ test('Can handle crash of a async job', async () => { | ||||
| 
 | ||||
| test('Can handle crash of a sync job', async () => { | ||||
|     const { logger, getRecords } = getLogger(); | ||||
|     const config = { ...createTestConfig(), logger }; | ||||
|     const settingStore = new FakeSettingStore(); | ||||
|     const settingService = new SettingService({ settingStore }, config, { | ||||
|         storeEvent() {}, | ||||
|     } as unknown as EventService); | ||||
|     const maintenanceService = new MaintenanceService(config, settingService); | ||||
|     const schedulerService = new SchedulerService(logger, maintenanceService); | ||||
|     const { schedulerService } = createSchedulerTestService({ | ||||
|         loggerOverride: logger, | ||||
|     }); | ||||
| 
 | ||||
|     const job = () => { | ||||
|         throw new Error('sync reason'); | ||||
| @ -237,3 +200,49 @@ test('Can handle crash of a sync job', async () => { | ||||
|         ['scheduled job failed | id: test-id-11 | Error: sync reason'], | ||||
|     ]); | ||||
| }); | ||||
| 
 | ||||
| test('Can handle crash of a async job', async () => { | ||||
|     const { logger, getRecords } = getLogger(); | ||||
|     const { schedulerService } = createSchedulerTestService({ | ||||
|         loggerOverride: logger, | ||||
|     }); | ||||
| 
 | ||||
|     const job = async () => { | ||||
|         await Promise.reject('async reason'); | ||||
|     }; | ||||
| 
 | ||||
|     await schedulerService.schedule(job, 50, 'test-id-10'); | ||||
|     await ms(75); | ||||
| 
 | ||||
|     schedulerService.stop(); | ||||
|     expect(getRecords()).toEqual([ | ||||
|         ['scheduled job failed | id: test-id-10 | async reason'], | ||||
|         ['scheduled job failed | id: test-id-10 | async reason'], | ||||
|     ]); | ||||
| }); | ||||
| 
 | ||||
| it('should emit scheduler job time event when scheduled function is run', async () => { | ||||
|     const eventBus = new EventEmitter(); | ||||
|     const { schedulerService } = createSchedulerTestService({ | ||||
|         eventBusOverride: eventBus, | ||||
|     }); | ||||
| 
 | ||||
|     const mockJob = async () => { | ||||
|         return Promise.resolve(); | ||||
|     }; | ||||
| 
 | ||||
|     const eventPromise = new Promise((resolve, reject) => { | ||||
|         eventBus.on(SCHEDULER_JOB_TIME, ({ jobId, time }) => { | ||||
|             try { | ||||
|                 expect(jobId).toBe('testJobId'); | ||||
|                 expect(typeof time).toBe('number'); | ||||
|                 resolve(null); | ||||
|             } catch (e) { | ||||
|                 reject(e); | ||||
|             } | ||||
|         }); | ||||
|     }); | ||||
| 
 | ||||
|     await schedulerService.schedule(mockJob, 50, 'testJobId'); | ||||
|     await eventPromise; | ||||
| }); | ||||
|  | ||||
| @ -1,5 +1,7 @@ | ||||
| import EventEmitter from 'events'; | ||||
| import { Logger, LogProvider } from '../../logger'; | ||||
| import MaintenanceService from '../../services/maintenance-service'; | ||||
| import { SCHEDULER_JOB_TIME } from '../../metric-events'; | ||||
| 
 | ||||
| export class SchedulerService { | ||||
|     private intervalIds: NodeJS.Timer[] = []; | ||||
| @ -8,12 +10,16 @@ export class SchedulerService { | ||||
| 
 | ||||
|     private maintenanceService: MaintenanceService; | ||||
| 
 | ||||
|     private eventBus: EventEmitter; | ||||
| 
 | ||||
|     constructor( | ||||
|         getLogger: LogProvider, | ||||
|         maintenanceService: MaintenanceService, | ||||
|         eventBus: EventEmitter, | ||||
|     ) { | ||||
|         this.logger = getLogger('/services/scheduler-service.ts'); | ||||
|         this.maintenanceService = maintenanceService; | ||||
|         this.eventBus = eventBus; | ||||
|     } | ||||
| 
 | ||||
|     async schedule( | ||||
| @ -21,13 +27,28 @@ export class SchedulerService { | ||||
|         timeMs: number, | ||||
|         id: string, | ||||
|     ): Promise<void> { | ||||
|         const runScheduledFunctionWithEvent = async () => { | ||||
|             const startTime = process.hrtime(); | ||||
|             await scheduledFunction(); | ||||
|             const endTime = process.hrtime(startTime); | ||||
| 
 | ||||
|             // Process hrtime returns a list with two numbers representing high-resolution time.
 | ||||
|             // The first number is the number of seconds, the second is the number of nanoseconds.
 | ||||
|             // Since there are 1e9 (1,000,000,000) nanoseconds in a second, endTime[1] / 1e9 converts the nanoseconds to seconds.
 | ||||
|             const durationInSeconds = endTime[0] + endTime[1] / 1e9; | ||||
|             this.eventBus.emit(SCHEDULER_JOB_TIME, { | ||||
|                 jobId: id, | ||||
|                 time: durationInSeconds, | ||||
|             }); | ||||
|         }; | ||||
| 
 | ||||
|         this.intervalIds.push( | ||||
|             setInterval(async () => { | ||||
|                 try { | ||||
|                     const maintenanceMode = | ||||
|                         await this.maintenanceService.isMaintenanceMode(); | ||||
|                     if (!maintenanceMode) { | ||||
|                         await scheduledFunction(); | ||||
|                         await runScheduledFunctionWithEvent(); | ||||
|                     } | ||||
|                 } catch (e) { | ||||
|                     this.logger.error( | ||||
| @ -40,7 +61,7 @@ export class SchedulerService { | ||||
|             const maintenanceMode = | ||||
|                 await this.maintenanceService.isMaintenanceMode(); | ||||
|             if (!maintenanceMode) { | ||||
|                 await scheduledFunction(); | ||||
|                 await runScheduledFunctionWithEvent(); | ||||
|             } | ||||
|         } catch (e) { | ||||
|             this.logger.error(`scheduled job failed | id: ${id} | ${e}`); | ||||
|  | ||||
| @ -1,4 +1,5 @@ | ||||
| const REQUEST_TIME = 'request_time'; | ||||
| const DB_TIME = 'db_time'; | ||||
| const SCHEDULER_JOB_TIME = 'scheduler_job_time'; | ||||
| 
 | ||||
| export { REQUEST_TIME, DB_TIME }; | ||||
| export { REQUEST_TIME, DB_TIME, SCHEDULER_JOB_TIME }; | ||||
|  | ||||
| @ -59,6 +59,14 @@ export default class MetricsMonitor { | ||||
|             maxAgeSeconds: 600, | ||||
|             ageBuckets: 5, | ||||
|         }); | ||||
|         const schedulerDuration = new client.Summary({ | ||||
|             name: 'scheduler_duration_seconds', | ||||
|             help: 'Scheduler duration time', | ||||
|             labelNames: ['jobId'], | ||||
|             percentiles: [0.1, 0.5, 0.9, 0.95, 0.99], | ||||
|             maxAgeSeconds: 600, | ||||
|             ageBuckets: 5, | ||||
|         }); | ||||
|         const dbDuration = new client.Summary({ | ||||
|             name: 'db_query_duration_seconds', | ||||
|             help: 'DB query duration time', | ||||
| @ -321,6 +329,10 @@ export default class MetricsMonitor { | ||||
|             }, | ||||
|         ); | ||||
| 
 | ||||
|         eventBus.on(events.SCHEDULER_JOB_TIME, ({ jobId, time }) => { | ||||
|             schedulerDuration.labels(jobId).observe(time); | ||||
|         }); | ||||
| 
 | ||||
|         eventBus.on('optimal304Differ', ({ status }) => { | ||||
|             optimal304DiffingCounter.labels(status).inc(); | ||||
|         }); | ||||
|  | ||||
| @ -290,6 +290,7 @@ export const createServices = ( | ||||
|     const schedulerService = new SchedulerService( | ||||
|         config.getLogger, | ||||
|         maintenanceService, | ||||
|         config.eventBus, | ||||
|     ); | ||||
| 
 | ||||
|     const eventAnnouncerService = new EventAnnouncerService(stores, config); | ||||
|  | ||||
| @ -15,6 +15,7 @@ test('Scheduler should run scheduled functions if maintenance mode is off', asyn | ||||
|     const schedulerService = new SchedulerService( | ||||
|         config.getLogger, | ||||
|         maintenanceService, | ||||
|         config.eventBus, | ||||
|     ); | ||||
| 
 | ||||
|     const job = jest.fn(); | ||||
| @ -35,6 +36,7 @@ test('Scheduler should not run scheduled functions if maintenance mode is on', a | ||||
|     const schedulerService = new SchedulerService( | ||||
|         config.getLogger, | ||||
|         maintenanceService, | ||||
|         config.eventBus, | ||||
|     ); | ||||
| 
 | ||||
|     await maintenanceService.toggleMaintenanceMode( | ||||
|  | ||||
		Loading…
	
		Reference in New Issue
	
	Block a user