1
0
mirror of https://github.com/Unleash/unleash.git synced 2025-02-09 00:18:00 +01:00

chore: add a class to handle aggreggation queries (#8446)

## About the changes
We have many aggregation queries that run on a schedule:
f63496d47f/src/lib/metrics.ts (L714-L719)

These staticCounters are usually doing db query aggregations that
traverse tables and we run all of them in parallel:
f63496d47f/src/lib/metrics.ts (L410-L412)

This can add strain to the db. This PR suggests a way of handling these
queries in a more structured way, allowing us to run them sequentially
(therefore spreading the load):
f02fe87835/src/lib/metrics-gauge.ts (L38-L40)

As an additional benefit, we get both the gauge definition and the
queries in a single place:
f02fe87835/src/lib/metrics.ts (L131-L141)

This PR only tackles 1 metric, and it only focuses on gauges to gather
initial feedback. The plan is to migrate these metrics and eventually
incorporate more types (e.g. counters)

---------

Co-authored-by: Nuno Góis <github@nunogois.com>
This commit is contained in:
Gastón Fournier 2024-10-18 11:11:22 +02:00 committed by GitHub
parent 88f396f6b6
commit a9f9be1efa
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
8 changed files with 1197 additions and 904 deletions

View File

@ -4,11 +4,18 @@ import createStores from '../../../test/fixtures/store';
import VersionService from '../../services/version-service'; import VersionService from '../../services/version-service';
import { createFakeGetActiveUsers } from './getActiveUsers'; import { createFakeGetActiveUsers } from './getActiveUsers';
import { createFakeGetProductionChanges } from './getProductionChanges'; import { createFakeGetProductionChanges } from './getProductionChanges';
import { registerPrometheusMetrics } from '../../metrics';
import { register } from 'prom-client';
import type { IClientInstanceStore } from '../../types';
let instanceStatsService: InstanceStatsService; let instanceStatsService: InstanceStatsService;
let versionService: VersionService; let versionService: VersionService;
let clientInstanceStore: IClientInstanceStore;
let updateMetrics: () => Promise<void>;
beforeEach(() => { beforeEach(() => {
jest.clearAllMocks();
register.clear();
const config = createTestConfig(); const config = createTestConfig();
const stores = createStores(); const stores = createStores();
versionService = new VersionService( versionService = new VersionService(
@ -17,6 +24,7 @@ beforeEach(() => {
createFakeGetActiveUsers(), createFakeGetActiveUsers(),
createFakeGetProductionChanges(), createFakeGetProductionChanges(),
); );
clientInstanceStore = stores.clientInstanceStore;
instanceStatsService = new InstanceStatsService( instanceStatsService = new InstanceStatsService(
stores, stores,
config, config,
@ -25,23 +33,28 @@ beforeEach(() => {
createFakeGetProductionChanges(), createFakeGetProductionChanges(),
); );
jest.spyOn(instanceStatsService, 'refreshAppCountSnapshot'); const { collectDbMetrics } = registerPrometheusMetrics(
jest.spyOn(instanceStatsService, 'getLabeledAppCounts'); config,
stores,
undefined as unknown as string,
config.eventBus,
instanceStatsService,
);
updateMetrics = collectDbMetrics;
jest.spyOn(clientInstanceStore, 'getDistinctApplicationsCount');
jest.spyOn(instanceStatsService, 'getStats'); jest.spyOn(instanceStatsService, 'getStats');
// validate initial state without calls to these methods
expect(instanceStatsService.refreshAppCountSnapshot).toHaveBeenCalledTimes(
0,
);
expect(instanceStatsService.getStats).toHaveBeenCalledTimes(0); expect(instanceStatsService.getStats).toHaveBeenCalledTimes(0);
}); });
test('get snapshot should not call getStats', async () => { test('get snapshot should not call getStats', async () => {
await instanceStatsService.refreshAppCountSnapshot(); await updateMetrics();
expect(instanceStatsService.getLabeledAppCounts).toHaveBeenCalledTimes(1); expect(
clientInstanceStore.getDistinctApplicationsCount,
).toHaveBeenCalledTimes(3);
expect(instanceStatsService.getStats).toHaveBeenCalledTimes(0); expect(instanceStatsService.getStats).toHaveBeenCalledTimes(0);
// subsequent calls to getStatsSnapshot don't call getStats
for (let i = 0; i < 3; i++) { for (let i = 0; i < 3; i++) {
const { clientApps } = await instanceStatsService.getStats(); const { clientApps } = await instanceStatsService.getStats();
expect(clientApps).toStrictEqual([ expect(clientApps).toStrictEqual([
@ -51,12 +64,11 @@ test('get snapshot should not call getStats', async () => {
]); ]);
} }
// after querying the stats snapshot no call to getStats should be issued // after querying the stats snapshot no call to getStats should be issued
expect(instanceStatsService.getLabeledAppCounts).toHaveBeenCalledTimes(1); expect(
clientInstanceStore.getDistinctApplicationsCount,
).toHaveBeenCalledTimes(3);
}); });
test('before the snapshot is refreshed we can still get the appCount', async () => { test('before the snapshot is refreshed we can still get the appCount', async () => {
expect(instanceStatsService.refreshAppCountSnapshot).toHaveBeenCalledTimes(
0,
);
expect(instanceStatsService.getAppCountSnapshot('7d')).toBeUndefined(); expect(instanceStatsService.getAppCountSnapshot('7d')).toBeUndefined();
}); });

View File

@ -109,9 +109,9 @@ export class InstanceStatsService {
private appCount?: Partial<{ [key in TimeRange]: number }>; private appCount?: Partial<{ [key in TimeRange]: number }>;
private getActiveUsers: GetActiveUsers; getActiveUsers: GetActiveUsers;
private getProductionChanges: GetProductionChanges; getProductionChanges: GetProductionChanges;
private featureStrategiesReadModel: IFeatureStrategiesReadModel; private featureStrategiesReadModel: IFeatureStrategiesReadModel;
@ -180,25 +180,6 @@ export class InstanceStatsService {
this.featureStrategiesReadModel = featureStrategiesReadModel; this.featureStrategiesReadModel = featureStrategiesReadModel;
} }
async refreshAppCountSnapshot(): Promise<
Partial<{ [key in TimeRange]: number }>
> {
try {
this.appCount = await this.getLabeledAppCounts();
return this.appCount;
} catch (error) {
this.logger.warn(
'Unable to retrieve statistics. This will be retried',
error,
);
return {
'7d': 0,
'30d': 0,
allTime: 0,
};
}
}
getProjectModeCount(): Promise<ProjectModeCount[]> { getProjectModeCount(): Promise<ProjectModeCount[]> {
return this.projectStore.getProjectModeCounts(); return this.projectStore.getProjectModeCounts();
} }
@ -231,9 +212,6 @@ export class InstanceStatsService {
return settings?.enabled || false; return settings?.enabled || false;
} }
/**
* use getStatsSnapshot for low latency, sacrificing data-freshness
*/
async getStats(): Promise<InstanceStats> { async getStats(): Promise<InstanceStats> {
const versionInfo = await this.versionService.getVersionInfo(); const versionInfo = await this.versionService.getVersionInfo();
const [ const [
@ -265,22 +243,22 @@ export class InstanceStatsService {
] = await Promise.all([ ] = await Promise.all([
this.getToggleCount(), this.getToggleCount(),
this.getArchivedToggleCount(), this.getArchivedToggleCount(),
this.userStore.count(), this.getRegisteredUsers(),
this.userStore.countServiceAccounts(), this.countServiceAccounts(),
this.apiTokenStore.countByType(), this.countApiTokensByType(),
this.getActiveUsers(), this.getActiveUsers(),
this.getProjectModeCount(), this.getProjectModeCount(),
this.contextFieldStore.count(), this.contextFieldCount(),
this.groupStore.count(), this.groupCount(),
this.roleStore.count(), this.roleCount(),
this.roleStore.filteredCount({ type: CUSTOM_ROOT_ROLE_TYPE }), this.customRolesCount(),
this.roleStore.filteredCountInUse({ type: CUSTOM_ROOT_ROLE_TYPE }), this.customRolesCountInUse(),
this.environmentStore.count(), this.environmentCount(),
this.segmentStore.count(), this.segmentCount(),
this.strategyStore.count(), this.strategiesCount(),
this.hasSAML(), this.hasSAML(),
this.hasOIDC(), this.hasOIDC(),
this.appCount ? this.appCount : this.refreshAppCountSnapshot(), this.appCount ? this.appCount : this.getLabeledAppCounts(),
this.eventStore.deprecatedFilteredCount({ this.eventStore.deprecatedFilteredCount({
type: FEATURES_EXPORTED, type: FEATURES_EXPORTED,
}), }),
@ -288,7 +266,7 @@ export class InstanceStatsService {
type: FEATURES_IMPORTED, type: FEATURES_IMPORTED,
}), }),
this.getProductionChanges(), this.getProductionChanges(),
this.clientMetricsStore.countPreviousDayHourlyMetricsBuckets(), this.countPreviousDayHourlyMetricsBuckets(),
this.featureStrategiesReadModel.getMaxFeatureEnvironmentStrategies(), this.featureStrategiesReadModel.getMaxFeatureEnvironmentStrategies(),
this.featureStrategiesReadModel.getMaxConstraintValues(), this.featureStrategiesReadModel.getMaxConstraintValues(),
this.featureStrategiesReadModel.getMaxConstraintsPerStrategy(), this.featureStrategiesReadModel.getMaxConstraintsPerStrategy(),
@ -330,6 +308,59 @@ export class InstanceStatsService {
}; };
} }
groupCount(): Promise<number> {
return this.groupStore.count();
}
roleCount(): Promise<number> {
return this.roleStore.count();
}
customRolesCount(): Promise<number> {
return this.roleStore.filteredCount({ type: CUSTOM_ROOT_ROLE_TYPE });
}
customRolesCountInUse(): Promise<number> {
return this.roleStore.filteredCountInUse({
type: CUSTOM_ROOT_ROLE_TYPE,
});
}
segmentCount(): Promise<number> {
return this.segmentStore.count();
}
contextFieldCount(): Promise<number> {
return this.contextFieldStore.count();
}
strategiesCount(): Promise<number> {
return this.strategyStore.count();
}
environmentCount(): Promise<number> {
return this.environmentStore.count();
}
countPreviousDayHourlyMetricsBuckets(): Promise<{
enabledCount: number;
variantCount: number;
}> {
return this.clientMetricsStore.countPreviousDayHourlyMetricsBuckets();
}
countApiTokensByType(): Promise<Map<string, number>> {
return this.apiTokenStore.countByType();
}
getRegisteredUsers(): Promise<number> {
return this.userStore.count();
}
countServiceAccounts(): Promise<number> {
return this.userStore.countServiceAccounts();
}
async getLabeledAppCounts(): Promise< async getLabeledAppCounts(): Promise<
Partial<{ [key in TimeRange]: number }> Partial<{ [key in TimeRange]: number }>
> { > {
@ -338,11 +369,12 @@ export class InstanceStatsService {
this.clientInstanceStore.getDistinctApplicationsCount(30), this.clientInstanceStore.getDistinctApplicationsCount(30),
this.clientInstanceStore.getDistinctApplicationsCount(), this.clientInstanceStore.getDistinctApplicationsCount(),
]); ]);
return { this.appCount = {
'7d': t7d, '7d': t7d,
'30d': t30d, '30d': t30d,
allTime, allTime,
}; };
return this.appCount;
} }
getAppCountSnapshot(range: TimeRange): number | undefined { getAppCountSnapshot(range: TimeRange): number | undefined {

View File

@ -59,8 +59,12 @@ export const scheduleServices = async (
'updateLastSeen', 'updateLastSeen',
); );
// TODO this works fine for keeping labeledAppCounts up to date, but
// it would be nice if we can keep client_apps_total prometheus metric
// up to date. We'd need to have access to DbMetricsMonitor, which is
// where the metric is registered and call an update only for that metric
schedulerService.schedule( schedulerService.schedule(
instanceStatsService.refreshAppCountSnapshot.bind(instanceStatsService), instanceStatsService.getLabeledAppCounts.bind(instanceStatsService),
minutesToMilliseconds(5), minutesToMilliseconds(5),
'refreshAppCountSnapshot', 'refreshAppCountSnapshot',
); );

View File

@ -0,0 +1,114 @@
import { register } from 'prom-client';
import { createTestConfig } from '../test/config/test-config';
import type { IUnleashConfig } from './types';
import { DbMetricsMonitor } from './metrics-gauge';
const prometheusRegister = register;
let config: IUnleashConfig;
let dbMetrics: DbMetricsMonitor;
beforeAll(async () => {
config = createTestConfig({
server: {
serverMetrics: true,
},
});
});
beforeEach(async () => {
dbMetrics = new DbMetricsMonitor(config);
});
test('should collect registered metrics', async () => {
dbMetrics.registerGaugeDbMetric({
name: 'my_metric',
help: 'This is the answer to life, the univers, and everything',
labelNames: [],
query: () => Promise.resolve(42),
map: (result) => ({ value: result }),
});
await dbMetrics.refreshDbMetrics();
const metrics = await prometheusRegister.metrics();
expect(metrics).toMatch(/my_metric 42/);
});
test('should collect registered metrics with labels', async () => {
dbMetrics.registerGaugeDbMetric({
name: 'life_the_universe_and_everything',
help: 'This is the answer to life, the univers, and everything',
labelNames: ['test'],
query: () => Promise.resolve(42),
map: (result) => ({ value: result, labels: { test: 'case' } }),
});
await dbMetrics.refreshDbMetrics();
const metrics = await prometheusRegister.metrics();
expect(metrics).toMatch(
/life_the_universe_and_everything\{test="case"\} 42/,
);
});
test('should collect multiple registered metrics with and without labels', async () => {
dbMetrics.registerGaugeDbMetric({
name: 'my_first_metric',
help: 'This is the answer to life, the univers, and everything',
labelNames: [],
query: () => Promise.resolve(42),
map: (result) => ({ value: result }),
});
dbMetrics.registerGaugeDbMetric({
name: 'my_other_metric',
help: 'This is Eulers number',
labelNames: ['euler'],
query: () => Promise.resolve(Math.E),
map: (result) => ({ value: result, labels: { euler: 'number' } }),
});
await dbMetrics.refreshDbMetrics();
const metrics = await prometheusRegister.metrics();
expect(metrics).toMatch(/my_first_metric 42/);
expect(metrics).toMatch(/my_other_metric\{euler="number"\} 2.71828/);
});
test('should support different label and value pairs', async () => {
dbMetrics.registerGaugeDbMetric({
name: 'multi_dimensional',
help: 'This metric has different values for different labels',
labelNames: ['version', 'range'],
query: () => Promise.resolve(2),
map: (result) => [
{ value: result, labels: { version: '1', range: 'linear' } },
{
value: result * result,
labels: { version: '2', range: 'square' },
},
{ value: result / 2, labels: { version: '3', range: 'half' } },
],
});
await dbMetrics.refreshDbMetrics();
const metrics = await prometheusRegister.metrics();
expect(metrics).toMatch(
/multi_dimensional\{version="1",range="linear"\} 2\nmulti_dimensional\{version="2",range="square"\} 4\nmulti_dimensional\{version="3",range="half"\} 1/,
);
expect(
await dbMetrics.findValue('multi_dimensional', { range: 'linear' }),
).toBe(2);
expect(
await dbMetrics.findValue('multi_dimensional', { range: 'half' }),
).toBe(1);
expect(
await dbMetrics.findValue('multi_dimensional', { range: 'square' }),
).toBe(4);
expect(
await dbMetrics.findValue('multi_dimensional', { range: 'x' }),
).toBeUndefined();
expect(await dbMetrics.findValue('multi_dimensional')).toBe(2); // first match
expect(await dbMetrics.findValue('other')).toBeUndefined();
});

94
src/lib/metrics-gauge.ts Normal file
View File

@ -0,0 +1,94 @@
import type { Logger } from './logger';
import type { IUnleashConfig } from './types';
import { createGauge, type Gauge } from './util/metrics';
type Query<R> = () => Promise<R | undefined | null>;
type MetricValue<L extends string> = {
value: number;
labels?: Record<L, string | number>;
};
type MapResult<R, L extends string> = (
result: R,
) => MetricValue<L> | MetricValue<L>[];
type GaugeDefinition<T, L extends string> = {
name: string;
help: string;
labelNames: L[];
query: Query<T>;
map: MapResult<T, L>;
};
type Task = () => Promise<void>;
interface GaugeUpdater {
target: Gauge<string>;
task: Task;
}
export class DbMetricsMonitor {
private updaters: Map<string, GaugeUpdater> = new Map();
private log: Logger;
constructor({ getLogger }: Pick<IUnleashConfig, 'getLogger'>) {
this.log = getLogger('gauge-metrics');
}
private asArray<T>(value: T | T[]): T[] {
return Array.isArray(value) ? value : [value];
}
registerGaugeDbMetric<T, L extends string>(
definition: GaugeDefinition<T, L>,
): Task {
const gauge = createGauge(definition);
const task = async () => {
try {
const result = await definition.query();
if (result !== null && result !== undefined) {
const results = this.asArray(definition.map(result));
gauge.reset();
for (const r of results) {
if (r.labels) {
gauge.labels(r.labels).set(r.value);
} else {
gauge.set(r.value);
}
}
}
} catch (e) {
this.log.warn(`Failed to refresh ${definition.name}`, e);
}
};
this.updaters.set(definition.name, { target: gauge, task });
return task;
}
refreshDbMetrics = async () => {
const tasks = Array.from(this.updaters.entries()).map(
([name, updater]) => ({ name, task: updater.task }),
);
for (const { name, task } of tasks) {
this.log.debug(`Refreshing metric ${name}`);
await task();
}
};
async findValue(
name: string,
labels?: Record<string, string | number>,
): Promise<number | undefined> {
const gauge = await this.updaters.get(name)?.target.gauge?.get();
if (gauge && gauge.values.length > 0) {
const values = labels
? gauge.values.filter(({ labels: l }) => {
return Object.entries(labels).every(
([key, value]) => l[key] === value,
);
})
: gauge.values;
// return first value
return values.map(({ value }) => value).shift();
}
return undefined;
}
}

View File

@ -15,7 +15,11 @@ import {
FEATURE_UPDATED, FEATURE_UPDATED,
PROJECT_ENVIRONMENT_REMOVED, PROJECT_ENVIRONMENT_REMOVED,
} from './types/events'; } from './types/events';
import { createMetricsMonitor } from './metrics'; import {
createMetricsMonitor,
registerPrometheusMetrics,
registerPrometheusPostgresMetrics,
} from './metrics';
import createStores from '../test/fixtures/store'; import createStores from '../test/fixtures/store';
import { InstanceStatsService } from './features/instance-stats/instance-stats-service'; import { InstanceStatsService } from './features/instance-stats/instance-stats-service';
import VersionService from './services/version-service'; import VersionService from './services/version-service';
@ -46,6 +50,7 @@ let schedulerService: SchedulerService;
let featureLifeCycleStore: IFeatureLifecycleStore; let featureLifeCycleStore: IFeatureLifecycleStore;
let featureLifeCycleReadModel: IFeatureLifecycleReadModel; let featureLifeCycleReadModel: IFeatureLifecycleReadModel;
let db: ITestDb; let db: ITestDb;
let refreshDbMetrics: () => Promise<void>;
beforeAll(async () => { beforeAll(async () => {
const config = createTestConfig({ const config = createTestConfig({
@ -102,16 +107,16 @@ beforeAll(async () => {
}, },
}; };
await monitor.startMonitoring( const { collectDbMetrics, collectStaticCounters } =
config, registerPrometheusMetrics(
stores, config,
'4.0.0', stores,
eventBus, '4.0.0',
statsService, eventBus,
schedulerService, statsService,
// @ts-ignore - We don't want a full knex implementation for our tests, it's enough that it actually yields the numbers we want. );
metricsDbConf, refreshDbMetrics = collectDbMetrics;
); await collectStaticCounters();
}); });
afterAll(async () => { afterAll(async () => {
@ -212,6 +217,7 @@ test('should collect metrics for function timings', async () => {
}); });
test('should collect metrics for feature flag size', async () => { test('should collect metrics for feature flag size', async () => {
await refreshDbMetrics();
const metrics = await prometheusRegister.metrics(); const metrics = await prometheusRegister.metrics();
expect(metrics).toMatch(/feature_toggles_total\{version="(.*)"\} 0/); expect(metrics).toMatch(/feature_toggles_total\{version="(.*)"\} 0/);
}); });
@ -222,12 +228,13 @@ test('should collect metrics for archived feature flag size', async () => {
}); });
test('should collect metrics for total client apps', async () => { test('should collect metrics for total client apps', async () => {
await statsService.refreshAppCountSnapshot(); await refreshDbMetrics();
const metrics = await prometheusRegister.metrics(); const metrics = await prometheusRegister.metrics();
expect(metrics).toMatch(/client_apps_total\{range="(.*)"\} 0/); expect(metrics).toMatch(/client_apps_total\{range="(.*)"\} 0/);
}); });
test('Should collect metrics for database', async () => { test('Should collect metrics for database', async () => {
registerPrometheusPostgresMetrics(db.rawDatabase, eventBus, '15.0.0');
const metrics = await prometheusRegister.metrics(); const metrics = await prometheusRegister.metrics();
expect(metrics).toMatch(/db_pool_max/); expect(metrics).toMatch(/db_pool_max/);
expect(metrics).toMatch(/db_pool_min/); expect(metrics).toMatch(/db_pool_min/);

File diff suppressed because it is too large Load Diff

View File

@ -6,10 +6,12 @@ import {
import getLogger from '../../../fixtures/no-logger'; import getLogger from '../../../fixtures/no-logger';
import type { IUnleashStores } from '../../../../lib/types'; import type { IUnleashStores } from '../../../../lib/types';
import { ApiTokenType } from '../../../../lib/types/models/api-token'; import { ApiTokenType } from '../../../../lib/types/models/api-token';
import { registerPrometheusMetrics } from '../../../../lib/metrics';
let app: IUnleashTest; let app: IUnleashTest;
let db: ITestDb; let db: ITestDb;
let stores: IUnleashStores; let stores: IUnleashStores;
let refreshDbMetrics: () => Promise<void>;
beforeAll(async () => { beforeAll(async () => {
db = await dbInit('instance_admin_api_serial', getLogger); db = await dbInit('instance_admin_api_serial', getLogger);
@ -26,6 +28,15 @@ beforeAll(async () => {
}, },
db.rawDatabase, db.rawDatabase,
); );
const { collectDbMetrics } = registerPrometheusMetrics(
app.config,
stores,
undefined as unknown as string,
app.config.eventBus,
app.services.instanceStatsService,
);
refreshDbMetrics = collectDbMetrics;
}); });
afterAll(async () => { afterAll(async () => {
@ -39,6 +50,8 @@ test('should return instance statistics', async () => {
createdByUserId: 9999, createdByUserId: 9999,
}); });
await refreshDbMetrics();
return app.request return app.request
.get('/api/admin/instance-admin/statistics') .get('/api/admin/instance-admin/statistics')
.expect('Content-Type', /json/) .expect('Content-Type', /json/)