mirror of
https://github.com/Unleash/unleash.git
synced 2025-03-04 00:18:40 +01:00
refactor: batch feature metric inserts (#1928)
* refactor: batch feature metric inserts * fix: use startOfHour when collapsing metrics * refactor: avoid extra loop to sum yes/no values * refactor: add experimental flag for batching metrics
This commit is contained in:
parent
d91b91b56f
commit
d2999d816d
@ -61,7 +61,10 @@ Object {
|
||||
Symbol(kCapture): false,
|
||||
},
|
||||
"eventHook": undefined,
|
||||
"experimental": Object {},
|
||||
"experimental": Object {
|
||||
"batchMetrics": false,
|
||||
"embedProxy": false,
|
||||
},
|
||||
"frontendApiOrigins": Array [],
|
||||
"getLogger": [Function],
|
||||
"import": Object {
|
||||
|
@ -54,8 +54,19 @@ function mergeAll<T>(objects: Partial<T>[]): T {
|
||||
}
|
||||
|
||||
function loadExperimental(options: IUnleashOptions): IExperimentalOptions {
|
||||
return options.experimental || {};
|
||||
return {
|
||||
...options.experimental,
|
||||
embedProxy: parseEnvVarBoolean(
|
||||
process.env.UNLEASH_EXPERIMENTAL_EMBED_PROXY,
|
||||
Boolean(options.experimental?.embedProxy),
|
||||
),
|
||||
batchMetrics: parseEnvVarBoolean(
|
||||
process.env.UNLEASH_EXPERIMENTAL_BATCH_METRICS,
|
||||
Boolean(options.experimental?.batchMetrics),
|
||||
),
|
||||
};
|
||||
}
|
||||
|
||||
const defaultClientCachingOptions: IClientCachingOption = {
|
||||
enabled: true,
|
||||
maxAge: 600,
|
||||
|
@ -7,6 +7,7 @@ import {
|
||||
} from '../types/stores/client-metrics-store-v2';
|
||||
import NotFoundError from '../error/notfound-error';
|
||||
import { startOfHour } from 'date-fns';
|
||||
import { collapseHourlyMetrics } from '../util/collapseHourlyMetrics';
|
||||
|
||||
interface ClientMetricsEnvTable {
|
||||
feature_name: string;
|
||||
@ -28,7 +29,7 @@ const fromRow = (row: ClientMetricsEnvTable) => ({
|
||||
no: Number(row.no),
|
||||
});
|
||||
|
||||
const toRow = (metric: IClientMetricsEnv) => ({
|
||||
const toRow = (metric: IClientMetricsEnv): ClientMetricsEnvTable => ({
|
||||
feature_name: metric.featureName,
|
||||
app_name: metric.appName,
|
||||
environment: metric.environment,
|
||||
@ -102,22 +103,11 @@ export class ClientMetricsStoreV2 implements IClientMetricsStoreV2 {
|
||||
if (!metrics || metrics.length == 0) {
|
||||
return;
|
||||
}
|
||||
const rows = metrics.map(toRow);
|
||||
|
||||
const batch = rows.reduce((prev, curr) => {
|
||||
// eslint-disable-next-line prettier/prettier
|
||||
const key = `${curr.feature_name}_${curr.app_name}_${curr.environment}_${curr.timestamp.getTime()}`;
|
||||
if (prev[key]) {
|
||||
prev[key].yes += curr.yes;
|
||||
prev[key].no += curr.no;
|
||||
} else {
|
||||
prev[key] = curr;
|
||||
}
|
||||
return prev;
|
||||
}, {});
|
||||
const rows = collapseHourlyMetrics(metrics).map(toRow);
|
||||
|
||||
// Sort the rows to avoid deadlocks
|
||||
const batchRow = Object.values<ClientMetricsEnvTable>(batch).sort(
|
||||
const sortedRows = rows.sort(
|
||||
(a, b) =>
|
||||
a.feature_name.localeCompare(b.feature_name) ||
|
||||
a.app_name.localeCompare(b.app_name) ||
|
||||
@ -126,7 +116,7 @@ export class ClientMetricsStoreV2 implements IClientMetricsStoreV2 {
|
||||
|
||||
// Consider rewriting to SQL batch!
|
||||
const insert = this.db<ClientMetricsEnvTable>(TABLE)
|
||||
.insert(batchRow)
|
||||
.insert(sortedRows)
|
||||
.toQuery();
|
||||
|
||||
const query = `${insert.toString()} ON CONFLICT (feature_name, app_name, environment, timestamp) DO UPDATE SET "yes" = "client_metrics_env"."yes" + EXCLUDED.yes, "no" = "client_metrics_env"."no" + EXCLUDED.no`;
|
||||
|
@ -4,6 +4,7 @@ export interface IExperimentalOptions {
|
||||
userGroups?: boolean;
|
||||
anonymiseEventLog?: boolean;
|
||||
embedProxy?: boolean;
|
||||
batchMetrics?: boolean;
|
||||
}
|
||||
|
||||
export interface IExperimentalToggle {
|
||||
|
@ -8,47 +8,64 @@ import {
|
||||
IClientMetricsStoreV2,
|
||||
} from '../../types/stores/client-metrics-store-v2';
|
||||
import { clientMetricsSchema } from './schema';
|
||||
import { hoursToMilliseconds, minutesToMilliseconds } from 'date-fns';
|
||||
import { hoursToMilliseconds, secondsToMilliseconds } from 'date-fns';
|
||||
import { IFeatureToggleStore } from '../../types/stores/feature-toggle-store';
|
||||
import EventEmitter from 'events';
|
||||
import { CLIENT_METRICS } from '../../types/events';
|
||||
import ApiUser from '../../types/api-user';
|
||||
import { ALL } from '../../types/models/api-token';
|
||||
import User from '../../types/user';
|
||||
import { collapseHourlyMetrics } from '../../util/collapseHourlyMetrics';
|
||||
import { IExperimentalOptions } from '../../experimental';
|
||||
|
||||
export default class ClientMetricsServiceV2 {
|
||||
private timer: NodeJS.Timeout;
|
||||
private timers: NodeJS.Timeout[] = [];
|
||||
|
||||
private unsavedMetrics: IClientMetricsEnv[] = [];
|
||||
|
||||
private clientMetricsStoreV2: IClientMetricsStoreV2;
|
||||
|
||||
private featureToggleStore: IFeatureToggleStore;
|
||||
|
||||
private experimental: IExperimentalOptions;
|
||||
|
||||
private eventBus: EventEmitter;
|
||||
|
||||
private logger: Logger;
|
||||
|
||||
private bulkInterval: number;
|
||||
|
||||
constructor(
|
||||
{
|
||||
featureToggleStore,
|
||||
clientMetricsStoreV2,
|
||||
}: Pick<IUnleashStores, 'featureToggleStore' | 'clientMetricsStoreV2'>,
|
||||
{ eventBus, getLogger }: Pick<IUnleashConfig, 'eventBus' | 'getLogger'>,
|
||||
bulkInterval = minutesToMilliseconds(5),
|
||||
{
|
||||
experimental,
|
||||
eventBus,
|
||||
getLogger,
|
||||
}: Pick<IUnleashConfig, 'eventBus' | 'getLogger' | 'experimental'>,
|
||||
bulkInterval = secondsToMilliseconds(5),
|
||||
) {
|
||||
this.featureToggleStore = featureToggleStore;
|
||||
this.clientMetricsStoreV2 = clientMetricsStoreV2;
|
||||
this.experimental = experimental;
|
||||
this.eventBus = eventBus;
|
||||
this.logger = getLogger(
|
||||
'/services/client-metrics/client-metrics-service-v2.ts',
|
||||
);
|
||||
|
||||
this.bulkInterval = bulkInterval;
|
||||
this.timer = setInterval(async () => {
|
||||
await this.clientMetricsStoreV2.clearMetrics(48);
|
||||
}, hoursToMilliseconds(12));
|
||||
this.timer.unref();
|
||||
if (this.experimental.batchMetrics) {
|
||||
this.timers.push(
|
||||
setInterval(() => {
|
||||
this.bulkAdd().catch(console.error);
|
||||
}, bulkInterval).unref(),
|
||||
);
|
||||
}
|
||||
|
||||
this.timers.push(
|
||||
setInterval(() => {
|
||||
this.clientMetricsStoreV2.clearMetrics(48).catch(console.error);
|
||||
}, hoursToMilliseconds(12)).unref(),
|
||||
);
|
||||
}
|
||||
|
||||
async registerClientMetrics(
|
||||
@ -74,11 +91,28 @@ export default class ClientMetricsServiceV2 {
|
||||
}))
|
||||
.filter((item) => !(item.yes === 0 && item.no === 0));
|
||||
|
||||
// TODO: should we aggregate for a few minutes (bulkInterval) before pushing to DB?
|
||||
await this.clientMetricsStoreV2.batchInsertMetrics(clientMetrics);
|
||||
if (this.experimental.batchMetrics) {
|
||||
this.unsavedMetrics = collapseHourlyMetrics([
|
||||
...this.unsavedMetrics,
|
||||
...clientMetrics,
|
||||
]);
|
||||
} else {
|
||||
await this.clientMetricsStoreV2.batchInsertMetrics(clientMetrics);
|
||||
}
|
||||
|
||||
this.eventBus.emit(CLIENT_METRICS, value);
|
||||
}
|
||||
|
||||
async bulkAdd(): Promise<void> {
|
||||
if (this.experimental.batchMetrics && this.unsavedMetrics.length > 0) {
|
||||
// Make a copy of `unsavedMetrics` in case new metrics
|
||||
// arrive while awaiting `batchInsertMetrics`.
|
||||
const copy = [...this.unsavedMetrics];
|
||||
this.unsavedMetrics = [];
|
||||
await this.clientMetricsStoreV2.batchInsertMetrics(copy);
|
||||
}
|
||||
}
|
||||
|
||||
// Overview over usage last "hour" bucket and all applications using the toggle
|
||||
async getFeatureToggleMetricsSummary(
|
||||
featureName: string,
|
||||
@ -137,7 +171,6 @@ export default class ClientMetricsServiceV2 {
|
||||
}
|
||||
|
||||
destroy(): void {
|
||||
clearInterval(this.timer);
|
||||
this.timer = null;
|
||||
this.timers.forEach(clearInterval);
|
||||
}
|
||||
}
|
||||
|
110
src/lib/util/collapseHourlyMetrics.test.ts
Normal file
110
src/lib/util/collapseHourlyMetrics.test.ts
Normal file
@ -0,0 +1,110 @@
|
||||
import { collapseHourlyMetrics } from './collapseHourlyMetrics';
|
||||
import { IClientMetricsEnv } from '../types/stores/client-metrics-store-v2';
|
||||
import { addMinutes, startOfHour } from 'date-fns';
|
||||
|
||||
test('collapseHourlyMetrics', () => {
|
||||
const timestamp = startOfHour(new Date());
|
||||
|
||||
const metricAX1: IClientMetricsEnv = {
|
||||
featureName: 'a',
|
||||
appName: 'x',
|
||||
environment: 'x',
|
||||
timestamp: addMinutes(timestamp, 1),
|
||||
yes: 1,
|
||||
no: 11,
|
||||
};
|
||||
|
||||
const metricAX2: IClientMetricsEnv = {
|
||||
featureName: 'a',
|
||||
appName: 'x',
|
||||
environment: 'x',
|
||||
timestamp: addMinutes(timestamp, 2),
|
||||
yes: 2,
|
||||
no: 12,
|
||||
};
|
||||
|
||||
const metricBX: IClientMetricsEnv = {
|
||||
featureName: 'b',
|
||||
appName: 'x',
|
||||
environment: 'x',
|
||||
timestamp: addMinutes(timestamp, 3),
|
||||
yes: 101,
|
||||
no: 1001,
|
||||
};
|
||||
|
||||
const metricBY: IClientMetricsEnv = {
|
||||
featureName: 'b',
|
||||
appName: 'y',
|
||||
environment: 'y',
|
||||
timestamp: addMinutes(timestamp, 4),
|
||||
yes: 102,
|
||||
no: 1002,
|
||||
};
|
||||
|
||||
expect(
|
||||
collapseHourlyMetrics([metricAX1, metricAX2, metricBX, metricBY]),
|
||||
).toEqual([
|
||||
{
|
||||
featureName: 'a',
|
||||
appName: 'x',
|
||||
environment: 'x',
|
||||
timestamp,
|
||||
yes: 3,
|
||||
no: 23,
|
||||
},
|
||||
{
|
||||
featureName: 'b',
|
||||
appName: 'x',
|
||||
environment: 'x',
|
||||
timestamp,
|
||||
yes: 101,
|
||||
no: 1001,
|
||||
},
|
||||
{
|
||||
featureName: 'b',
|
||||
appName: 'y',
|
||||
environment: 'y',
|
||||
timestamp,
|
||||
yes: 102,
|
||||
no: 1002,
|
||||
},
|
||||
]);
|
||||
|
||||
expect(
|
||||
collapseHourlyMetrics([
|
||||
metricAX1,
|
||||
metricAX1,
|
||||
metricAX2,
|
||||
metricAX2,
|
||||
metricBX,
|
||||
metricBX,
|
||||
metricBY,
|
||||
metricBY,
|
||||
]),
|
||||
).toEqual([
|
||||
{
|
||||
featureName: 'a',
|
||||
appName: 'x',
|
||||
environment: 'x',
|
||||
timestamp,
|
||||
yes: 6,
|
||||
no: 46,
|
||||
},
|
||||
{
|
||||
featureName: 'b',
|
||||
appName: 'x',
|
||||
environment: 'x',
|
||||
timestamp,
|
||||
yes: 202,
|
||||
no: 2002,
|
||||
},
|
||||
{
|
||||
featureName: 'b',
|
||||
appName: 'y',
|
||||
environment: 'y',
|
||||
timestamp,
|
||||
yes: 204,
|
||||
no: 2004,
|
||||
},
|
||||
]);
|
||||
});
|
51
src/lib/util/collapseHourlyMetrics.ts
Normal file
51
src/lib/util/collapseHourlyMetrics.ts
Normal file
@ -0,0 +1,51 @@
|
||||
import { IClientMetricsEnv } from '../types/stores/client-metrics-store-v2';
|
||||
import { startOfHour } from 'date-fns';
|
||||
|
||||
const groupBy = <T>(list: T[], createKey: (item: T) => string): T[][] => {
|
||||
const groups = list.reduce((acc, item) => {
|
||||
const key = createKey(item);
|
||||
acc[key] = acc[key] ?? [];
|
||||
acc[key].push(item);
|
||||
return acc;
|
||||
}, {} as Record<string, T[]>);
|
||||
|
||||
return Object.values(groups);
|
||||
};
|
||||
|
||||
const createMetricKey = (metric: IClientMetricsEnv): string => {
|
||||
return [
|
||||
metric.featureName,
|
||||
metric.appName,
|
||||
metric.environment,
|
||||
metric.timestamp.getTime(),
|
||||
].join();
|
||||
};
|
||||
|
||||
const sumYesNo = (
|
||||
metrics: IClientMetricsEnv[],
|
||||
): Pick<IClientMetricsEnv, 'yes' | 'no'> => {
|
||||
return metrics.reduce(
|
||||
(acc, metric) => ({
|
||||
yes: acc.yes + metric.yes,
|
||||
no: acc.no + metric.no,
|
||||
}),
|
||||
{
|
||||
yes: 0,
|
||||
no: 0,
|
||||
},
|
||||
);
|
||||
};
|
||||
|
||||
export const collapseHourlyMetrics = (
|
||||
metrics: IClientMetricsEnv[],
|
||||
): IClientMetricsEnv[] => {
|
||||
const hourlyMetrics = metrics.map((metric) => ({
|
||||
...metric,
|
||||
timestamp: startOfHour(metric.timestamp),
|
||||
}));
|
||||
|
||||
return groupBy(hourlyMetrics, createMetricKey).flatMap((group) => ({
|
||||
...group[0],
|
||||
...sumYesNo(group),
|
||||
}));
|
||||
};
|
@ -35,6 +35,7 @@ process.nextTick(async () => {
|
||||
anonymiseEventLog: false,
|
||||
userGroups: true,
|
||||
embedProxy: true,
|
||||
batchMetrics: true,
|
||||
},
|
||||
authentication: {
|
||||
initApiTokens: [
|
||||
|
@ -25,6 +25,7 @@ export function createTestConfig(config?: IUnleashOptions): IUnleashConfig {
|
||||
experimental: {
|
||||
userGroups: true,
|
||||
embedProxy: true,
|
||||
batchMetrics: true,
|
||||
},
|
||||
};
|
||||
const options = mergeAll<IUnleashOptions>([testConfig, config]);
|
||||
|
@ -39,6 +39,7 @@ test('should enrich metrics with environment from api-token', async () => {
|
||||
.send(metricsExample)
|
||||
.expect(202);
|
||||
|
||||
await app.services.clientMetricsServiceV2.bulkAdd();
|
||||
const all = await clientMetricsStoreV2.getAll();
|
||||
expect(all[0].environment).toBe('some');
|
||||
});
|
||||
|
@ -94,6 +94,7 @@ test('should pick up environment from token', async () => {
|
||||
})
|
||||
.expect(202);
|
||||
|
||||
await app.services.clientMetricsServiceV2.bulkAdd();
|
||||
const metrics = await db.stores.clientMetricsStoreV2.getAll();
|
||||
expect(metrics[0].environment).toBe('test');
|
||||
expect(metrics[0].appName).toBe('some-fancy-app');
|
||||
|
@ -284,6 +284,7 @@ test('should store proxy client metrics', async () => {
|
||||
})
|
||||
.expect(200)
|
||||
.expect((res) => expect(res.text).toEqual('OK'));
|
||||
await app.services.clientMetricsServiceV2.bulkAdd();
|
||||
await app.request
|
||||
.get(`/api/admin/client-metrics/features/${featureName}`)
|
||||
.set('Authorization', adminToken.secret)
|
||||
|
@ -44,6 +44,7 @@ async function createApp(
|
||||
const destroy = async () => {
|
||||
services.versionService.destroy();
|
||||
services.clientInstanceService.destroy();
|
||||
services.clientMetricsServiceV2.destroy();
|
||||
services.apiTokenService.destroy();
|
||||
};
|
||||
|
||||
|
Loading…
Reference in New Issue
Block a user