mirror of
https://github.com/Unleash/unleash.git
synced 2025-04-10 01:16:39 +02:00
fix: scheduler job runtime control (#5363)
## PR Description
https://linear.app/unleash/issue/2-1645/address-post-mortem-action-point-all-flags-should-be-runtime
Refactor with the goal of ensuring that flags are runtime controllable,
mostly focused on the current scheduler logic.
This includes the following changes:
- Moves scheduler into its own "scheduler" feature folder
- Reverts dependency: SchedulerService takes in the MaintenanceService,
not the other way around
- Scheduler now evaluates maintenance mode at runtime instead of relying
only on its mode state (active / paused)
- Favors flag checks to happen inside the scheduled methods, instead of
controlling whether the method is scheduled at all (favor runtime over
startup)
- Moves "account last seen update" to scheduler
- Updates tests accordingly
- Boyscouting
Here's a manual test showing this behavior, where my local instance was
controlled by a remote instance. Whenever I toggle `maintenanceMode`
through a flag remotely, my scheduled functions stop running:
https://github.com/Unleash/unleash/assets/14320932/ae0a7fa9-5165-4c0b-9b0b-53b9fb20de72
Had a look through all of our current flags and it *seems to me* that
they are all used in a runtime controllable way, but would still feel
more comfortable if this was double checked, since it can be complex to
ensure this.
The only exception to this was `migrationLock`, which I believe is OK,
since the migration only happens at the start anyways.
## Discussion / Questions
~~Scheduler `mode` (active / paused) is currently not *really* being
used, along with its respective methods, except in tests. I think this
could be a potential footgun. Should we remove it in favor of only
controlling the scheduler state through maintenance mode?~~ Addressed in
7c52e3f638
~~The config property `disableScheduler` is still a startup
configuration, but perhaps that makes sense to leave as is?~~
[Answered](https://github.com/Unleash/unleash/pull/5363#issuecomment-1819005445)
by @FredrikOseberg, leaving as is.
Are there any other tests we should add?
Is there anything I missed?
Identified some `setInterval` and `setTimeout` that may make sense to
leave as is instead of moving over to the scheduler service:
- ~~`src/lib/metrics` - This is currently considered a `MetricsMonitor`.
Should this be refactored to a service instead and adapt these
setIntervals to use the scheduler instead? Is there anything special
with this we need to take into account? @chriswk @ivarconr~~
[Answered](https://github.com/Unleash/unleash/pull/5363#issuecomment-1820501511)
by @ivarconr, leaving as is.
- ~~`src/lib/proxy/proxy-repository.ts` - This seems to have a complex
and specific logic currently. Perhaps we should leave it alone for now?
@FredrikOseberg~~
[Answered](https://github.com/Unleash/unleash/pull/5363#issuecomment-1819005445)
by @FredrikOseberg, leaving as is.
- `src/lib/services/user-service.ts` - This one also seems to be a bit
more specific, where we generate new timeouts for each receiver id.
Might not belong in the scheduler service. @Tymek
This commit is contained in:
parent
27252f7728
commit
ae375703d2
153
src/lib/features/scheduler/schedule-services.ts
Normal file
153
src/lib/features/scheduler/schedule-services.ts
Normal file
@ -0,0 +1,153 @@
|
||||
import {
|
||||
hoursToMilliseconds,
|
||||
minutesToMilliseconds,
|
||||
secondsToMilliseconds,
|
||||
} from 'date-fns';
|
||||
import { IUnleashServices } from '../../server-impl';
|
||||
|
||||
/**
|
||||
* Schedules service methods.
|
||||
*
|
||||
* In order to promote runtime control, you should **not use** a flagResolver inside this method. Instead, implement your flag usage inside the scheduled methods themselves.
|
||||
* @param services
|
||||
*/
|
||||
export const scheduleServices = async (
|
||||
services: IUnleashServices,
|
||||
): Promise<void> => {
|
||||
const {
|
||||
accountService,
|
||||
schedulerService,
|
||||
apiTokenService,
|
||||
instanceStatsService,
|
||||
clientInstanceService,
|
||||
projectService,
|
||||
projectHealthService,
|
||||
configurationRevisionService,
|
||||
eventAnnouncerService,
|
||||
featureToggleService,
|
||||
versionService,
|
||||
lastSeenService,
|
||||
proxyService,
|
||||
clientMetricsServiceV2,
|
||||
} = services;
|
||||
|
||||
schedulerService.schedule(
|
||||
lastSeenService.cleanLastSeen.bind(lastSeenService),
|
||||
hoursToMilliseconds(1),
|
||||
'cleanLastSeen',
|
||||
);
|
||||
|
||||
schedulerService.schedule(
|
||||
lastSeenService.store.bind(lastSeenService),
|
||||
secondsToMilliseconds(30),
|
||||
'storeLastSeen',
|
||||
);
|
||||
|
||||
schedulerService.schedule(
|
||||
apiTokenService.fetchActiveTokens.bind(apiTokenService),
|
||||
minutesToMilliseconds(1),
|
||||
'fetchActiveTokens',
|
||||
);
|
||||
|
||||
schedulerService.schedule(
|
||||
apiTokenService.updateLastSeen.bind(apiTokenService),
|
||||
minutesToMilliseconds(3),
|
||||
'updateLastSeen',
|
||||
);
|
||||
|
||||
schedulerService.schedule(
|
||||
instanceStatsService.refreshStatsSnapshot.bind(instanceStatsService),
|
||||
minutesToMilliseconds(5),
|
||||
'refreshStatsSnapshot',
|
||||
);
|
||||
|
||||
schedulerService.schedule(
|
||||
clientInstanceService.removeInstancesOlderThanTwoDays.bind(
|
||||
clientInstanceService,
|
||||
),
|
||||
hoursToMilliseconds(24),
|
||||
'removeInstancesOlderThanTwoDays',
|
||||
);
|
||||
|
||||
schedulerService.schedule(
|
||||
clientInstanceService.bulkAdd.bind(clientInstanceService),
|
||||
secondsToMilliseconds(5),
|
||||
'bulkAddInstances',
|
||||
);
|
||||
|
||||
schedulerService.schedule(
|
||||
clientInstanceService.announceUnannounced.bind(clientInstanceService),
|
||||
minutesToMilliseconds(5),
|
||||
'announceUnannounced',
|
||||
);
|
||||
|
||||
schedulerService.schedule(
|
||||
projectService.statusJob.bind(projectService),
|
||||
hoursToMilliseconds(24),
|
||||
'statusJob',
|
||||
);
|
||||
|
||||
schedulerService.schedule(
|
||||
projectHealthService.setHealthRating.bind(projectHealthService),
|
||||
hoursToMilliseconds(1),
|
||||
'setHealthRating',
|
||||
);
|
||||
|
||||
schedulerService.schedule(
|
||||
configurationRevisionService.updateMaxRevisionId.bind(
|
||||
configurationRevisionService,
|
||||
),
|
||||
secondsToMilliseconds(1),
|
||||
'updateMaxRevisionId',
|
||||
);
|
||||
|
||||
schedulerService.schedule(
|
||||
eventAnnouncerService.publishUnannouncedEvents.bind(
|
||||
eventAnnouncerService,
|
||||
),
|
||||
secondsToMilliseconds(1),
|
||||
'publishUnannouncedEvents',
|
||||
);
|
||||
|
||||
schedulerService.schedule(
|
||||
featureToggleService.updatePotentiallyStaleFeatures.bind(
|
||||
featureToggleService,
|
||||
),
|
||||
minutesToMilliseconds(1),
|
||||
'updatePotentiallyStaleFeatures',
|
||||
);
|
||||
|
||||
schedulerService.schedule(
|
||||
versionService.checkLatestVersion.bind(versionService),
|
||||
hoursToMilliseconds(48),
|
||||
'checkLatestVersion',
|
||||
);
|
||||
|
||||
schedulerService.schedule(
|
||||
proxyService.fetchFrontendSettings.bind(proxyService),
|
||||
minutesToMilliseconds(2),
|
||||
'fetchFrontendSettings',
|
||||
);
|
||||
|
||||
schedulerService.schedule(
|
||||
() => {
|
||||
clientMetricsServiceV2.bulkAdd().catch(console.error);
|
||||
},
|
||||
secondsToMilliseconds(5),
|
||||
'bulkAddMetrics',
|
||||
);
|
||||
|
||||
schedulerService.schedule(
|
||||
() => {
|
||||
clientMetricsServiceV2.clearMetrics(48).catch(console.error);
|
||||
},
|
||||
hoursToMilliseconds(12),
|
||||
'clearMetrics',
|
||||
);
|
||||
|
||||
schedulerService.schedule(
|
||||
accountService.updateLastSeen.bind(accountService),
|
||||
minutesToMilliseconds(3),
|
||||
'updateAccountLastSeen',
|
||||
);
|
||||
};
|
239
src/lib/features/scheduler/scheduler-service.test.ts
Normal file
239
src/lib/features/scheduler/scheduler-service.test.ts
Normal file
@ -0,0 +1,239 @@
|
||||
import { SchedulerService } from './scheduler-service';
|
||||
import { LogProvider } from '../../logger';
|
||||
import MaintenanceService from '../../services/maintenance-service';
|
||||
import { createTestConfig } from '../../../test/config/test-config';
|
||||
import SettingService from '../../services/setting-service';
|
||||
import FakeSettingStore from '../../../test/fixtures/fake-setting-store';
|
||||
import EventService from '../../services/event-service';
|
||||
|
||||
function ms(timeMs) {
|
||||
return new Promise((resolve) => setTimeout(resolve, timeMs));
|
||||
}
|
||||
|
||||
const getLogger = () => {
|
||||
const records: any[] = [];
|
||||
const logger: LogProvider = () => ({
|
||||
error(...args: any[]) {
|
||||
records.push(args);
|
||||
},
|
||||
debug() {},
|
||||
info() {},
|
||||
warn() {},
|
||||
fatal() {},
|
||||
});
|
||||
const getRecords = () => records;
|
||||
|
||||
return { logger, getRecords };
|
||||
};
|
||||
|
||||
const toggleMaintenanceMode = async (
|
||||
maintenanceService: MaintenanceService,
|
||||
enabled: boolean,
|
||||
) => {
|
||||
await maintenanceService.toggleMaintenanceMode(
|
||||
{ enabled },
|
||||
'irrelevant user',
|
||||
);
|
||||
};
|
||||
|
||||
test('Schedules job immediately', async () => {
|
||||
const config = createTestConfig();
|
||||
const settingStore = new FakeSettingStore();
|
||||
const settingService = new SettingService({ settingStore }, config, {
|
||||
storeEvent() {},
|
||||
} as unknown as EventService);
|
||||
const maintenanceService = new MaintenanceService(config, settingService);
|
||||
const schedulerService = new SchedulerService(
|
||||
config.getLogger,
|
||||
maintenanceService,
|
||||
);
|
||||
|
||||
const job = jest.fn();
|
||||
|
||||
await schedulerService.schedule(job, 10, 'test-id');
|
||||
|
||||
expect(job).toBeCalledTimes(1);
|
||||
schedulerService.stop();
|
||||
});
|
||||
|
||||
test('Does not schedule job immediately when paused', async () => {
|
||||
const config = createTestConfig();
|
||||
const settingStore = new FakeSettingStore();
|
||||
const settingService = new SettingService({ settingStore }, config, {
|
||||
storeEvent() {},
|
||||
} as unknown as EventService);
|
||||
const maintenanceService = new MaintenanceService(config, settingService);
|
||||
const schedulerService = new SchedulerService(
|
||||
config.getLogger,
|
||||
maintenanceService,
|
||||
);
|
||||
|
||||
const job = jest.fn();
|
||||
|
||||
await toggleMaintenanceMode(maintenanceService, true);
|
||||
await schedulerService.schedule(job, 10, 'test-id-2');
|
||||
|
||||
expect(job).toBeCalledTimes(0);
|
||||
schedulerService.stop();
|
||||
});
|
||||
|
||||
test('Can schedule a single regular job', async () => {
|
||||
const config = createTestConfig();
|
||||
const settingStore = new FakeSettingStore();
|
||||
const settingService = new SettingService({ settingStore }, config, {
|
||||
storeEvent() {},
|
||||
} as unknown as EventService);
|
||||
const maintenanceService = new MaintenanceService(config, settingService);
|
||||
const schedulerService = new SchedulerService(
|
||||
config.getLogger,
|
||||
maintenanceService,
|
||||
);
|
||||
|
||||
const job = jest.fn();
|
||||
|
||||
await schedulerService.schedule(job, 50, 'test-id-3');
|
||||
await ms(75);
|
||||
|
||||
expect(job).toBeCalledTimes(2);
|
||||
schedulerService.stop();
|
||||
});
|
||||
|
||||
test('Scheduled job ignored in a paused mode', async () => {
|
||||
const config = createTestConfig();
|
||||
const settingStore = new FakeSettingStore();
|
||||
const settingService = new SettingService({ settingStore }, config, {
|
||||
storeEvent() {},
|
||||
} as unknown as EventService);
|
||||
const maintenanceService = new MaintenanceService(config, settingService);
|
||||
const schedulerService = new SchedulerService(
|
||||
config.getLogger,
|
||||
maintenanceService,
|
||||
);
|
||||
|
||||
const job = jest.fn();
|
||||
|
||||
await toggleMaintenanceMode(maintenanceService, true);
|
||||
await schedulerService.schedule(job, 50, 'test-id-4');
|
||||
await ms(75);
|
||||
|
||||
expect(job).toBeCalledTimes(0);
|
||||
schedulerService.stop();
|
||||
});
|
||||
|
||||
test('Can resume paused job', async () => {
|
||||
const config = createTestConfig();
|
||||
const settingStore = new FakeSettingStore();
|
||||
const settingService = new SettingService({ settingStore }, config, {
|
||||
storeEvent() {},
|
||||
} as unknown as EventService);
|
||||
const maintenanceService = new MaintenanceService(config, settingService);
|
||||
const schedulerService = new SchedulerService(
|
||||
config.getLogger,
|
||||
maintenanceService,
|
||||
);
|
||||
|
||||
const job = jest.fn();
|
||||
|
||||
await toggleMaintenanceMode(maintenanceService, true);
|
||||
await schedulerService.schedule(job, 50, 'test-id-5');
|
||||
await toggleMaintenanceMode(maintenanceService, false);
|
||||
await ms(75);
|
||||
|
||||
expect(job).toBeCalledTimes(1);
|
||||
schedulerService.stop();
|
||||
});
|
||||
|
||||
test('Can schedule multiple jobs at the same interval', async () => {
|
||||
const config = createTestConfig();
|
||||
const settingStore = new FakeSettingStore();
|
||||
const settingService = new SettingService({ settingStore }, config, {
|
||||
storeEvent() {},
|
||||
} as unknown as EventService);
|
||||
const maintenanceService = new MaintenanceService(config, settingService);
|
||||
const schedulerService = new SchedulerService(
|
||||
config.getLogger,
|
||||
maintenanceService,
|
||||
);
|
||||
|
||||
const job = jest.fn();
|
||||
const anotherJob = jest.fn();
|
||||
|
||||
await schedulerService.schedule(job, 50, 'test-id-6');
|
||||
await schedulerService.schedule(anotherJob, 50, 'test-id-7');
|
||||
await ms(75);
|
||||
|
||||
expect(job).toBeCalledTimes(2);
|
||||
expect(anotherJob).toBeCalledTimes(2);
|
||||
schedulerService.stop();
|
||||
});
|
||||
|
||||
test('Can schedule multiple jobs at the different intervals', async () => {
|
||||
const config = createTestConfig();
|
||||
const settingStore = new FakeSettingStore();
|
||||
const settingService = new SettingService({ settingStore }, config, {
|
||||
storeEvent() {},
|
||||
} as unknown as EventService);
|
||||
const maintenanceService = new MaintenanceService(config, settingService);
|
||||
const schedulerService = new SchedulerService(
|
||||
config.getLogger,
|
||||
maintenanceService,
|
||||
);
|
||||
const job = jest.fn();
|
||||
const anotherJob = jest.fn();
|
||||
|
||||
await schedulerService.schedule(job, 100, 'test-id-8');
|
||||
await schedulerService.schedule(anotherJob, 200, 'test-id-9');
|
||||
await ms(250);
|
||||
|
||||
expect(job).toBeCalledTimes(3);
|
||||
expect(anotherJob).toBeCalledTimes(2);
|
||||
schedulerService.stop();
|
||||
});
|
||||
|
||||
test('Can handle crash of a async job', async () => {
|
||||
const { logger, getRecords } = getLogger();
|
||||
const config = { ...createTestConfig(), logger };
|
||||
const settingStore = new FakeSettingStore();
|
||||
const settingService = new SettingService({ settingStore }, config, {
|
||||
storeEvent() {},
|
||||
} as unknown as EventService);
|
||||
const maintenanceService = new MaintenanceService(config, settingService);
|
||||
const schedulerService = new SchedulerService(logger, maintenanceService);
|
||||
|
||||
const job = async () => {
|
||||
await Promise.reject('async reason');
|
||||
};
|
||||
|
||||
await schedulerService.schedule(job, 50, 'test-id-10');
|
||||
await ms(75);
|
||||
|
||||
schedulerService.stop();
|
||||
expect(getRecords()).toEqual([
|
||||
['scheduled job failed | id: test-id-10 | async reason'],
|
||||
['scheduled job failed | id: test-id-10 | async reason'],
|
||||
]);
|
||||
});
|
||||
|
||||
test('Can handle crash of a sync job', async () => {
|
||||
const { logger, getRecords } = getLogger();
|
||||
const config = { ...createTestConfig(), logger };
|
||||
const settingStore = new FakeSettingStore();
|
||||
const settingService = new SettingService({ settingStore }, config, {
|
||||
storeEvent() {},
|
||||
} as unknown as EventService);
|
||||
const maintenanceService = new MaintenanceService(config, settingService);
|
||||
const schedulerService = new SchedulerService(logger, maintenanceService);
|
||||
|
||||
const job = () => {
|
||||
throw new Error('sync reason');
|
||||
};
|
||||
|
||||
await schedulerService.schedule(job, 50, 'test-id-11');
|
||||
await ms(75);
|
||||
|
||||
schedulerService.stop();
|
||||
expect(getRecords()).toEqual([
|
||||
['scheduled job failed | id: test-id-11 | Error: sync reason'],
|
||||
['scheduled job failed | id: test-id-11 | Error: sync reason'],
|
||||
]);
|
||||
});
|
@ -1,17 +1,19 @@
|
||||
import { Logger, LogProvider } from '../logger';
|
||||
|
||||
export type SchedulerMode = 'active' | 'paused';
|
||||
import { Logger, LogProvider } from '../../logger';
|
||||
import MaintenanceService from '../../services/maintenance-service';
|
||||
|
||||
export class SchedulerService {
|
||||
private intervalIds: NodeJS.Timer[] = [];
|
||||
|
||||
private mode: SchedulerMode;
|
||||
|
||||
private logger: Logger;
|
||||
|
||||
constructor(getLogger: LogProvider) {
|
||||
private maintenanceService: MaintenanceService;
|
||||
|
||||
constructor(
|
||||
getLogger: LogProvider,
|
||||
maintenanceService: MaintenanceService,
|
||||
) {
|
||||
this.logger = getLogger('/services/scheduler-service.ts');
|
||||
this.mode = 'active';
|
||||
this.maintenanceService = maintenanceService;
|
||||
}
|
||||
|
||||
async schedule(
|
||||
@ -22,7 +24,9 @@ export class SchedulerService {
|
||||
this.intervalIds.push(
|
||||
setInterval(async () => {
|
||||
try {
|
||||
if (this.mode === 'active') {
|
||||
const maintenanceMode =
|
||||
await this.maintenanceService.isMaintenanceMode();
|
||||
if (!maintenanceMode) {
|
||||
await scheduledFunction();
|
||||
}
|
||||
} catch (e) {
|
||||
@ -33,7 +37,9 @@ export class SchedulerService {
|
||||
}, timeMs).unref(),
|
||||
);
|
||||
try {
|
||||
if (this.mode === 'active') {
|
||||
const maintenanceMode =
|
||||
await this.maintenanceService.isMaintenanceMode();
|
||||
if (!maintenanceMode) {
|
||||
await scheduledFunction();
|
||||
}
|
||||
} catch (e) {
|
||||
@ -44,16 +50,4 @@ export class SchedulerService {
|
||||
stop(): void {
|
||||
this.intervalIds.forEach(clearInterval);
|
||||
}
|
||||
|
||||
pause(): void {
|
||||
this.mode = 'paused';
|
||||
}
|
||||
|
||||
resume(): void {
|
||||
this.mode = 'active';
|
||||
}
|
||||
|
||||
getMode(): SchedulerMode {
|
||||
return this.mode;
|
||||
}
|
||||
}
|
@ -4,15 +4,9 @@ import { createTestConfig } from '../../test/config/test-config';
|
||||
import FakeEventStore from '../../test/fixtures/fake-event-store';
|
||||
import { randomId } from '../util/random-id';
|
||||
import FakeProjectStore from '../../test/fixtures/fake-project-store';
|
||||
import {
|
||||
EventService,
|
||||
ProxyService,
|
||||
SchedulerService,
|
||||
SettingService,
|
||||
} from '../../lib/services';
|
||||
import { EventService, ProxyService, SettingService } from '../../lib/services';
|
||||
import { ISettingStore } from '../../lib/types';
|
||||
import { frontendSettingsKey } from '../../lib/types/settings/frontend-settings';
|
||||
import { minutesToMilliseconds } from 'date-fns';
|
||||
import FakeFeatureTagStore from '../../test/fixtures/fake-feature-tag-store';
|
||||
|
||||
const createSettingService = (
|
||||
|
@ -5,7 +5,7 @@ import { migrateDb } from '../migrator';
|
||||
import getApp from './app';
|
||||
import { createMetricsMonitor } from './metrics';
|
||||
import { createStores } from './db';
|
||||
import { createServices, scheduleServices } from './services';
|
||||
import { createServices } from './services';
|
||||
import { createConfig } from './create-config';
|
||||
import registerGracefulShutdown from './util/graceful-shutdown';
|
||||
import { createDb } from './db/db-pool';
|
||||
@ -33,6 +33,7 @@ import * as permissions from './types/permissions';
|
||||
import * as eventType from './types/events';
|
||||
import { Db } from './db/db';
|
||||
import { defaultLockKey, defaultTimeout, withDbLock } from './util/db-lock';
|
||||
import { scheduleServices } from './features/scheduler/schedule-services';
|
||||
|
||||
async function createApp(
|
||||
config: IUnleashConfig,
|
||||
@ -45,7 +46,7 @@ async function createApp(
|
||||
const stores = createStores(config, db);
|
||||
const services = createServices(stores, config, db);
|
||||
if (!config.disableScheduler) {
|
||||
await scheduleServices(services, config.flagResolver);
|
||||
await scheduleServices(services);
|
||||
}
|
||||
|
||||
const metricsMonitor = createMetricsMonitor();
|
||||
|
@ -18,8 +18,6 @@ export class AccountService {
|
||||
|
||||
private accessService: AccessService;
|
||||
|
||||
private seenTimer: NodeJS.Timeout;
|
||||
|
||||
private lastSeenSecrets: Set<string> = new Set<string>();
|
||||
|
||||
constructor(
|
||||
@ -32,7 +30,6 @@ export class AccountService {
|
||||
this.logger = getLogger('service/account-service.ts');
|
||||
this.store = stores.accountStore;
|
||||
this.accessService = services.accessService;
|
||||
this.updateLastSeen();
|
||||
}
|
||||
|
||||
async getAll(): Promise<IUserWithRole[]> {
|
||||
@ -63,19 +60,9 @@ export class AccountService {
|
||||
this.lastSeenSecrets = new Set<string>();
|
||||
await this.store.markSeenAt(toStore);
|
||||
}
|
||||
|
||||
this.seenTimer = setTimeout(
|
||||
async () => this.updateLastSeen(),
|
||||
minutesToMilliseconds(3),
|
||||
).unref();
|
||||
}
|
||||
|
||||
addPATSeen(secret: string): void {
|
||||
this.lastSeenSecrets.add(secret);
|
||||
}
|
||||
|
||||
destroy(): void {
|
||||
clearTimeout(this.seenTimer);
|
||||
this.seenTimer = null;
|
||||
}
|
||||
}
|
||||
|
@ -1,9 +1,12 @@
|
||||
import { secondsToMilliseconds } from 'date-fns';
|
||||
import { Logger } from '../../../logger';
|
||||
import { IUnleashConfig } from '../../../server-impl';
|
||||
import { IClientMetricsEnv } from '../../../types/stores/client-metrics-store-v2';
|
||||
import { ILastSeenStore } from './types/last-seen-store-type';
|
||||
import { IFeatureToggleStore, IUnleashStores } from '../../../../lib/types';
|
||||
import {
|
||||
IFeatureToggleStore,
|
||||
IFlagResolver,
|
||||
IUnleashStores,
|
||||
} from '../../../../lib/types';
|
||||
|
||||
export type LastSeenInput = {
|
||||
featureName: string;
|
||||
@ -21,6 +24,8 @@ export class LastSeenService {
|
||||
|
||||
private config: IUnleashConfig;
|
||||
|
||||
private flagResolver: IFlagResolver;
|
||||
|
||||
constructor(
|
||||
{
|
||||
featureToggleStore,
|
||||
@ -33,6 +38,7 @@ export class LastSeenService {
|
||||
this.logger = config.getLogger(
|
||||
'/services/client-metrics/last-seen-service.ts',
|
||||
);
|
||||
this.flagResolver = config.flagResolver;
|
||||
this.config = config;
|
||||
}
|
||||
|
||||
@ -75,6 +81,8 @@ export class LastSeenService {
|
||||
}
|
||||
|
||||
async cleanLastSeen() {
|
||||
await this.lastSeenStore.cleanLastSeen();
|
||||
if (this.flagResolver.isEnabled('useLastSeenRefactor')) {
|
||||
await this.lastSeenStore.cleanLastSeen();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -1,9 +1,4 @@
|
||||
import {
|
||||
IUnleashConfig,
|
||||
IUnleashStores,
|
||||
IUnleashServices,
|
||||
IFlagResolver,
|
||||
} from '../types';
|
||||
import { IUnleashConfig, IUnleashStores, IUnleashServices } from '../types';
|
||||
import FeatureTypeService from './feature-type-service';
|
||||
import EventService from './event-service';
|
||||
import HealthService from './health-service';
|
||||
@ -44,13 +39,8 @@ import { LastSeenService } from './client-metrics/last-seen/last-seen-service';
|
||||
import { InstanceStatsService } from '../features/instance-stats/instance-stats-service';
|
||||
import { FavoritesService } from './favorites-service';
|
||||
import MaintenanceService from './maintenance-service';
|
||||
import {
|
||||
hoursToMilliseconds,
|
||||
minutesToMilliseconds,
|
||||
secondsToMilliseconds,
|
||||
} from 'date-fns';
|
||||
import { AccountService } from './account-service';
|
||||
import { SchedulerService } from './scheduler-service';
|
||||
import { SchedulerService } from '../features/scheduler/scheduler-service';
|
||||
import { Knex } from 'knex';
|
||||
import {
|
||||
createExportImportTogglesService,
|
||||
@ -109,149 +99,6 @@ import {
|
||||
} from '../features/feature-search/createFeatureSearchService';
|
||||
import { FeatureSearchService } from '../features/feature-search/feature-search-service';
|
||||
|
||||
// TODO: will be moved to scheduler feature directory
|
||||
export const scheduleServices = async (
|
||||
services: IUnleashServices,
|
||||
flagResolver: IFlagResolver,
|
||||
): Promise<void> => {
|
||||
const {
|
||||
schedulerService,
|
||||
apiTokenService,
|
||||
instanceStatsService,
|
||||
clientInstanceService,
|
||||
projectService,
|
||||
projectHealthService,
|
||||
configurationRevisionService,
|
||||
maintenanceService,
|
||||
eventAnnouncerService,
|
||||
featureToggleService,
|
||||
versionService,
|
||||
lastSeenService,
|
||||
proxyService,
|
||||
clientMetricsServiceV2,
|
||||
} = services;
|
||||
|
||||
if (await maintenanceService.isMaintenanceMode()) {
|
||||
schedulerService.pause();
|
||||
}
|
||||
|
||||
if (flagResolver.isEnabled('useLastSeenRefactor')) {
|
||||
schedulerService.schedule(
|
||||
lastSeenService.cleanLastSeen.bind(lastSeenService),
|
||||
hoursToMilliseconds(1),
|
||||
'cleanLastSeen',
|
||||
);
|
||||
}
|
||||
|
||||
schedulerService.schedule(
|
||||
lastSeenService.store.bind(lastSeenService),
|
||||
secondsToMilliseconds(30),
|
||||
'storeLastSeen',
|
||||
);
|
||||
|
||||
schedulerService.schedule(
|
||||
apiTokenService.fetchActiveTokens.bind(apiTokenService),
|
||||
minutesToMilliseconds(1),
|
||||
'fetchActiveTokens',
|
||||
);
|
||||
|
||||
schedulerService.schedule(
|
||||
apiTokenService.updateLastSeen.bind(apiTokenService),
|
||||
minutesToMilliseconds(3),
|
||||
'updateLastSeen',
|
||||
);
|
||||
|
||||
schedulerService.schedule(
|
||||
instanceStatsService.refreshStatsSnapshot.bind(instanceStatsService),
|
||||
minutesToMilliseconds(5),
|
||||
'refreshStatsSnapshot',
|
||||
);
|
||||
|
||||
schedulerService.schedule(
|
||||
clientInstanceService.removeInstancesOlderThanTwoDays.bind(
|
||||
clientInstanceService,
|
||||
),
|
||||
hoursToMilliseconds(24),
|
||||
'removeInstancesOlderThanTwoDays',
|
||||
);
|
||||
|
||||
schedulerService.schedule(
|
||||
clientInstanceService.bulkAdd.bind(clientInstanceService),
|
||||
secondsToMilliseconds(5),
|
||||
'bulkAddInstances',
|
||||
);
|
||||
|
||||
schedulerService.schedule(
|
||||
clientInstanceService.announceUnannounced.bind(clientInstanceService),
|
||||
minutesToMilliseconds(5),
|
||||
'announceUnannounced',
|
||||
);
|
||||
|
||||
schedulerService.schedule(
|
||||
projectService.statusJob.bind(projectService),
|
||||
hoursToMilliseconds(24),
|
||||
'statusJob',
|
||||
);
|
||||
|
||||
schedulerService.schedule(
|
||||
projectHealthService.setHealthRating.bind(projectHealthService),
|
||||
hoursToMilliseconds(1),
|
||||
'setHealthRating',
|
||||
);
|
||||
|
||||
schedulerService.schedule(
|
||||
configurationRevisionService.updateMaxRevisionId.bind(
|
||||
configurationRevisionService,
|
||||
),
|
||||
secondsToMilliseconds(1),
|
||||
'updateMaxRevisionId',
|
||||
);
|
||||
|
||||
schedulerService.schedule(
|
||||
eventAnnouncerService.publishUnannouncedEvents.bind(
|
||||
eventAnnouncerService,
|
||||
),
|
||||
secondsToMilliseconds(1),
|
||||
'publishUnannouncedEvents',
|
||||
);
|
||||
|
||||
schedulerService.schedule(
|
||||
featureToggleService.updatePotentiallyStaleFeatures.bind(
|
||||
featureToggleService,
|
||||
),
|
||||
minutesToMilliseconds(1),
|
||||
'updatePotentiallyStaleFeatures',
|
||||
);
|
||||
|
||||
schedulerService.schedule(
|
||||
versionService.checkLatestVersion.bind(versionService),
|
||||
hoursToMilliseconds(48),
|
||||
'checkLatestVersion',
|
||||
);
|
||||
|
||||
schedulerService.schedule(
|
||||
proxyService.fetchFrontendSettings.bind(proxyService),
|
||||
minutesToMilliseconds(2),
|
||||
'fetchFrontendSettings',
|
||||
);
|
||||
|
||||
schedulerService.schedule(
|
||||
() => {
|
||||
clientMetricsServiceV2.bulkAdd().catch(console.error);
|
||||
},
|
||||
secondsToMilliseconds(5),
|
||||
'bulkAddMetrics',
|
||||
);
|
||||
|
||||
schedulerService.schedule(
|
||||
() => {
|
||||
clientMetricsServiceV2.clearMetrics(48).catch(console.error);
|
||||
},
|
||||
hoursToMilliseconds(12),
|
||||
'clearMetrics',
|
||||
);
|
||||
};
|
||||
|
||||
export const createServices = (
|
||||
stores: IUnleashStores,
|
||||
config: IUnleashConfig,
|
||||
@ -438,13 +285,11 @@ export const createServices = (
|
||||
db ? createGetProductionChanges(db) : createFakeGetProductionChanges(),
|
||||
);
|
||||
|
||||
const schedulerService = new SchedulerService(config.getLogger);
|
||||
const maintenanceService = new MaintenanceService(config, settingService);
|
||||
|
||||
const maintenanceService = new MaintenanceService(
|
||||
stores,
|
||||
config,
|
||||
settingService,
|
||||
schedulerService,
|
||||
const schedulerService = new SchedulerService(
|
||||
config.getLogger,
|
||||
maintenanceService,
|
||||
);
|
||||
|
||||
const eventAnnouncerService = new EventAnnouncerService(stores, config);
|
||||
|
@ -1,17 +1,40 @@
|
||||
import { SchedulerService } from './scheduler-service';
|
||||
import { SchedulerService } from '../features/scheduler/scheduler-service';
|
||||
import MaintenanceService from './maintenance-service';
|
||||
import { IUnleashStores } from '../types';
|
||||
import SettingService from './setting-service';
|
||||
import { createTestConfig } from '../../test/config/test-config';
|
||||
import FakeSettingStore from '../../test/fixtures/fake-setting-store';
|
||||
import EventService from './event-service';
|
||||
|
||||
test('Maintenance on should pause scheduler', async () => {
|
||||
test('Scheduler should run scheduled functions if maintenance mode is off', async () => {
|
||||
const config = createTestConfig();
|
||||
const schedulerService = new SchedulerService(config.getLogger);
|
||||
const maintenanceService = new MaintenanceService(
|
||||
{} as IUnleashStores,
|
||||
config,
|
||||
{ insert() {} } as unknown as SettingService,
|
||||
schedulerService,
|
||||
const settingStore = new FakeSettingStore();
|
||||
const settingService = new SettingService({ settingStore }, config, {
|
||||
storeEvent() {},
|
||||
} as unknown as EventService);
|
||||
const maintenanceService = new MaintenanceService(config, settingService);
|
||||
const schedulerService = new SchedulerService(
|
||||
config.getLogger,
|
||||
maintenanceService,
|
||||
);
|
||||
|
||||
const job = jest.fn();
|
||||
|
||||
await schedulerService.schedule(job, 10, 'test-id');
|
||||
|
||||
expect(job).toBeCalledTimes(1);
|
||||
schedulerService.stop();
|
||||
});
|
||||
|
||||
test('Scheduler should not run scheduled functions if maintenance mode is on', async () => {
|
||||
const config = createTestConfig();
|
||||
const settingStore = new FakeSettingStore();
|
||||
const settingService = new SettingService({ settingStore }, config, {
|
||||
storeEvent() {},
|
||||
} as unknown as EventService);
|
||||
const maintenanceService = new MaintenanceService(config, settingService);
|
||||
const schedulerService = new SchedulerService(
|
||||
config.getLogger,
|
||||
maintenanceService,
|
||||
);
|
||||
|
||||
await maintenanceService.toggleMaintenanceMode(
|
||||
@ -19,26 +42,10 @@ test('Maintenance on should pause scheduler', async () => {
|
||||
'irrelevant user',
|
||||
);
|
||||
|
||||
expect(schedulerService.getMode()).toBe('paused');
|
||||
schedulerService.stop();
|
||||
});
|
||||
|
||||
test('Maintenance off should resume scheduler', async () => {
|
||||
const config = createTestConfig({ disableScheduler: false });
|
||||
const schedulerService = new SchedulerService(config.getLogger);
|
||||
schedulerService.pause();
|
||||
const maintenanceService = new MaintenanceService(
|
||||
{} as IUnleashStores,
|
||||
config,
|
||||
{ insert() {} } as unknown as SettingService,
|
||||
schedulerService,
|
||||
);
|
||||
|
||||
await maintenanceService.toggleMaintenanceMode(
|
||||
{ enabled: false },
|
||||
'irrelevant user',
|
||||
);
|
||||
|
||||
expect(schedulerService.getMode()).toBe('active');
|
||||
const job = jest.fn();
|
||||
|
||||
await schedulerService.schedule(job, 10, 'test-id');
|
||||
|
||||
expect(job).toBeCalledTimes(0);
|
||||
schedulerService.stop();
|
||||
});
|
||||
|
@ -1,40 +1,20 @@
|
||||
import { IUnleashConfig, IUnleashStores } from '../types';
|
||||
import { IUnleashConfig } from '../types';
|
||||
import { Logger } from '../logger';
|
||||
import { IPatStore } from '../types/stores/pat-store';
|
||||
import { IEventStore } from '../types/stores/event-store';
|
||||
import SettingService from './setting-service';
|
||||
import { maintenanceSettingsKey } from '../types/settings/maintenance-settings';
|
||||
import { MaintenanceSchema } from '../openapi/spec/maintenance-schema';
|
||||
import { SchedulerService } from './scheduler-service';
|
||||
|
||||
export default class MaintenanceService {
|
||||
private config: IUnleashConfig;
|
||||
|
||||
private logger: Logger;
|
||||
|
||||
private patStore: IPatStore;
|
||||
|
||||
private eventStore: IEventStore;
|
||||
|
||||
private settingService: SettingService;
|
||||
|
||||
private schedulerService: SchedulerService;
|
||||
|
||||
constructor(
|
||||
{
|
||||
patStore,
|
||||
eventStore,
|
||||
}: Pick<IUnleashStores, 'patStore' | 'eventStore'>,
|
||||
config: IUnleashConfig,
|
||||
settingService: SettingService,
|
||||
schedulerService: SchedulerService,
|
||||
) {
|
||||
constructor(config: IUnleashConfig, settingService: SettingService) {
|
||||
this.config = config;
|
||||
this.logger = config.getLogger('services/pat-service.ts');
|
||||
this.patStore = patStore;
|
||||
this.eventStore = eventStore;
|
||||
this.settingService = settingService;
|
||||
this.schedulerService = schedulerService;
|
||||
}
|
||||
|
||||
async isMaintenanceMode(): Promise<boolean> {
|
||||
@ -56,11 +36,6 @@ export default class MaintenanceService {
|
||||
setting: MaintenanceSchema,
|
||||
user: string,
|
||||
): Promise<void> {
|
||||
if (setting.enabled) {
|
||||
this.schedulerService.pause();
|
||||
} else if (!this.config.disableScheduler) {
|
||||
this.schedulerService.resume();
|
||||
}
|
||||
return this.settingService.insert(
|
||||
maintenanceSettingsKey,
|
||||
setting,
|
||||
|
@ -1,148 +0,0 @@
|
||||
import { SchedulerService } from './scheduler-service';
|
||||
import { LogProvider } from '../logger';
|
||||
|
||||
function ms(timeMs) {
|
||||
return new Promise((resolve) => setTimeout(resolve, timeMs));
|
||||
}
|
||||
|
||||
const getLogger = () => {
|
||||
const records: any[] = [];
|
||||
const logger: LogProvider = () => ({
|
||||
error(...args: any[]) {
|
||||
records.push(args);
|
||||
},
|
||||
debug() {},
|
||||
info() {},
|
||||
warn() {},
|
||||
fatal() {},
|
||||
});
|
||||
const getRecords = () => records;
|
||||
|
||||
return { logger, getRecords };
|
||||
};
|
||||
|
||||
test('Schedules job immediately', async () => {
|
||||
const { logger } = getLogger();
|
||||
const schedulerService = new SchedulerService(logger);
|
||||
const job = jest.fn();
|
||||
|
||||
schedulerService.schedule(job, 10, 'test-id');
|
||||
|
||||
expect(job).toBeCalledTimes(1);
|
||||
schedulerService.stop();
|
||||
});
|
||||
|
||||
test('Does not schedule job immediately when paused', async () => {
|
||||
const { logger } = getLogger();
|
||||
const schedulerService = new SchedulerService(logger);
|
||||
const job = jest.fn();
|
||||
|
||||
schedulerService.pause();
|
||||
schedulerService.schedule(job, 10, 'test-id-2');
|
||||
|
||||
expect(job).toBeCalledTimes(0);
|
||||
schedulerService.stop();
|
||||
});
|
||||
|
||||
test('Can schedule a single regular job', async () => {
|
||||
const { logger } = getLogger();
|
||||
const schedulerService = new SchedulerService(logger);
|
||||
const job = jest.fn();
|
||||
|
||||
schedulerService.schedule(job, 50, 'test-id-3');
|
||||
await ms(75);
|
||||
|
||||
expect(job).toBeCalledTimes(2);
|
||||
schedulerService.stop();
|
||||
});
|
||||
|
||||
test('Scheduled job ignored in a paused mode', async () => {
|
||||
const { logger } = getLogger();
|
||||
const schedulerService = new SchedulerService(logger);
|
||||
const job = jest.fn();
|
||||
|
||||
schedulerService.pause();
|
||||
schedulerService.schedule(job, 50, 'test-id-4');
|
||||
await ms(75);
|
||||
|
||||
expect(job).toBeCalledTimes(0);
|
||||
schedulerService.stop();
|
||||
});
|
||||
|
||||
test('Can resume paused job', async () => {
|
||||
const { logger } = getLogger();
|
||||
const schedulerService = new SchedulerService(logger);
|
||||
const job = jest.fn();
|
||||
|
||||
schedulerService.pause();
|
||||
schedulerService.schedule(job, 50, 'test-id-5');
|
||||
schedulerService.resume();
|
||||
await ms(75);
|
||||
|
||||
expect(job).toBeCalledTimes(1);
|
||||
schedulerService.stop();
|
||||
});
|
||||
|
||||
test('Can schedule multiple jobs at the same interval', async () => {
|
||||
const { logger } = getLogger();
|
||||
const schedulerService = new SchedulerService(logger);
|
||||
const job = jest.fn();
|
||||
const anotherJob = jest.fn();
|
||||
|
||||
schedulerService.schedule(job, 50, 'test-id-6');
|
||||
schedulerService.schedule(anotherJob, 50, 'test-id-7');
|
||||
await ms(75);
|
||||
|
||||
expect(job).toBeCalledTimes(2);
|
||||
expect(anotherJob).toBeCalledTimes(2);
|
||||
schedulerService.stop();
|
||||
});
|
||||
|
||||
test('Can schedule multiple jobs at the different intervals', async () => {
|
||||
const { logger } = getLogger();
|
||||
const schedulerService = new SchedulerService(logger);
|
||||
const job = jest.fn();
|
||||
const anotherJob = jest.fn();
|
||||
|
||||
schedulerService.schedule(job, 100, 'test-id-8');
|
||||
schedulerService.schedule(anotherJob, 200, 'test-id-9');
|
||||
await ms(250);
|
||||
|
||||
expect(job).toBeCalledTimes(3);
|
||||
expect(anotherJob).toBeCalledTimes(2);
|
||||
schedulerService.stop();
|
||||
});
|
||||
|
||||
test('Can handle crash of a async job', async () => {
|
||||
const { logger, getRecords } = getLogger();
|
||||
const schedulerService = new SchedulerService(logger);
|
||||
const job = async () => {
|
||||
await Promise.reject('async reason');
|
||||
};
|
||||
|
||||
schedulerService.schedule(job, 50, 'test-id-10');
|
||||
await ms(75);
|
||||
|
||||
schedulerService.stop();
|
||||
expect(getRecords()).toEqual([
|
||||
['scheduled job failed | id: test-id-10 | async reason'],
|
||||
['scheduled job failed | id: test-id-10 | async reason'],
|
||||
]);
|
||||
});
|
||||
|
||||
test('Can handle crash of a sync job', async () => {
|
||||
const { logger, getRecords } = getLogger();
|
||||
const schedulerService = new SchedulerService(logger);
|
||||
const job = () => {
|
||||
throw new Error('sync reason');
|
||||
};
|
||||
|
||||
schedulerService.schedule(job, 50, 'test-id-11');
|
||||
await ms(75);
|
||||
|
||||
schedulerService.stop();
|
||||
expect(getRecords()).toEqual([
|
||||
['scheduled job failed | id: test-id-11 | Error: sync reason'],
|
||||
['scheduled job failed | id: test-id-11 | Error: sync reason'],
|
||||
]);
|
||||
});
|
@ -37,7 +37,7 @@ import { InstanceStatsService } from '../features/instance-stats/instance-stats-
|
||||
import { FavoritesService } from '../services/favorites-service';
|
||||
import MaintenanceService from '../services/maintenance-service';
|
||||
import { AccountService } from '../services/account-service';
|
||||
import { SchedulerService } from '../services/scheduler-service';
|
||||
import { SchedulerService } from '../features/scheduler/scheduler-service';
|
||||
import { Knex } from 'knex';
|
||||
import {
|
||||
IExportService,
|
||||
|
Loading…
Reference in New Issue
Block a user