1
0
mirror of https://github.com/Unleash/unleash.git synced 2025-02-09 00:18:00 +01:00

feat: separate frontend backend counting (#9167)

This commit is contained in:
Mateusz Kwasniewski 2025-01-29 13:31:37 +01:00 committed by GitHub
parent 7ca8cc2276
commit cbe0ac475c
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
8 changed files with 381 additions and 56 deletions

View File

@ -1,5 +1,6 @@
import type { IUniqueConnectionStore } from '../../types';
import type {
BucketId,
TimedUniqueConnections,
UniqueConnections,
} from './unique-connection-store-type';
@ -16,7 +17,7 @@ export class FakeUniqueConnectionStore implements IUniqueConnectionStore {
}
async get(
id: 'current' | 'previous',
id: BucketId,
): Promise<(UniqueConnections & { updatedAt: Date }) | null> {
return this.uniqueConnectionsRecord[id] || null;
}

View File

@ -1,3 +1,10 @@
export interface IUniqueConnectionReadModel {
getStats(): Promise<{ previous: number; current: number }>;
getStats(): Promise<{
previous: number;
current: number;
previousBackend: number;
currentBackend: number;
previousFrontend: number;
currentFrontend: number;
}>;
}

View File

@ -14,9 +14,20 @@ export class UniqueConnectionReadModel implements IUniqueConnectionReadModel {
}
async getStats() {
const [previous, current] = await Promise.all([
const [
previous,
current,
previousFrontend,
currentFrontend,
previousBackend,
currentBackend,
] = await Promise.all([
this.uniqueConnectionStore.get('previous'),
this.uniqueConnectionStore.get('current'),
this.uniqueConnectionStore.get('previousFrontend'),
this.uniqueConnectionStore.get('currentFrontend'),
this.uniqueConnectionStore.get('previousBackend'),
this.uniqueConnectionStore.get('currentBackend'),
]);
const previousHll = HyperLogLog(REGISTERS_EXPONENT);
if (previous) {
@ -26,6 +37,41 @@ export class UniqueConnectionReadModel implements IUniqueConnectionReadModel {
if (current) {
currentHll.merge({ n: REGISTERS_EXPONENT, buckets: current.hll });
}
return { previous: previousHll.count(), current: currentHll.count() };
const previousFrontendHll = HyperLogLog(REGISTERS_EXPONENT);
if (previousFrontend) {
previousFrontendHll.merge({
n: REGISTERS_EXPONENT,
buckets: previousFrontend.hll,
});
}
const currentFrontendHll = HyperLogLog(REGISTERS_EXPONENT);
if (currentFrontend) {
currentFrontendHll.merge({
n: REGISTERS_EXPONENT,
buckets: currentFrontend.hll,
});
}
const previousBackendHll = HyperLogLog(REGISTERS_EXPONENT);
if (previousBackend) {
previousBackendHll.merge({
n: REGISTERS_EXPONENT,
buckets: previousBackend.hll,
});
}
const currentBackendHll = HyperLogLog(REGISTERS_EXPONENT);
if (currentBackend) {
currentBackendHll.merge({
n: REGISTERS_EXPONENT,
buckets: currentBackend.hll,
});
}
return {
previous: previousHll.count(),
current: currentHll.count(),
previousFrontend: previousFrontendHll.count(),
currentFrontend: currentFrontendHll.count(),
previousBackend: previousBackendHll.count(),
currentBackend: currentBackendHll.count(),
};
}
}

View File

@ -26,16 +26,38 @@ test('sync first current bucket', async () => {
);
uniqueConnectionService.listen();
eventBus.emit(SDK_CONNECTION_ID_RECEIVED, 'connection1');
eventBus.emit(SDK_CONNECTION_ID_RECEIVED, 'connection1');
eventBus.emit(SDK_CONNECTION_ID_RECEIVED, 'connection2');
eventBus.emit(SDK_CONNECTION_ID_RECEIVED, 'connection2');
eventBus.emit(SDK_CONNECTION_ID_RECEIVED, 'connection2');
eventBus.emit(SDK_CONNECTION_ID_RECEIVED, {
connectionId: 'connection1',
type: 'backend',
});
eventBus.emit(SDK_CONNECTION_ID_RECEIVED, {
connectionId: 'connection1',
type: 'backend',
});
eventBus.emit(SDK_CONNECTION_ID_RECEIVED, {
connectionId: 'connection2',
type: 'backend',
});
eventBus.emit(SDK_CONNECTION_ID_RECEIVED, {
connectionId: 'connection2',
type: 'backend',
});
eventBus.emit(SDK_CONNECTION_ID_RECEIVED, {
connectionId: 'connection2',
type: 'backend',
});
await uniqueConnectionService.sync();
const stats = await uniqueConnectionReadModel.getStats();
expect(stats).toEqual({ previous: 0, current: 2 });
expect(stats).toEqual({
previous: 0,
current: 2,
previousBackend: 0,
currentBackend: 2,
previousFrontend: 0,
currentFrontend: 0,
});
});
test('sync first previous bucket', async () => {
@ -51,17 +73,33 @@ test('sync first previous bucket', async () => {
uniqueConnectionStore,
);
eventBus.emit(SDK_CONNECTION_ID_RECEIVED, 'connection1');
eventBus.emit(SDK_CONNECTION_ID_RECEIVED, 'connection2');
eventBus.emit(SDK_CONNECTION_ID_RECEIVED, {
connectionId: 'connection1',
type: 'backend',
});
eventBus.emit(SDK_CONNECTION_ID_RECEIVED, {
connectionId: 'connection2',
type: 'backend',
});
await uniqueConnectionService.sync();
eventBus.emit(SDK_CONNECTION_ID_RECEIVED, 'connection3');
eventBus.emit(SDK_CONNECTION_ID_RECEIVED, {
connectionId: 'connection3',
type: 'backend',
});
await uniqueConnectionService.sync(addHours(new Date(), 1));
const stats = await uniqueConnectionReadModel.getStats();
expect(stats).toEqual({ previous: 3, current: 0 });
expect(stats).toEqual({
previous: 3,
current: 0,
previousBackend: 3,
currentBackend: 0,
previousFrontend: 0,
currentFrontend: 0,
});
});
test('sync to existing current bucket from the same service', async () => {
@ -77,16 +115,35 @@ test('sync to existing current bucket from the same service', async () => {
uniqueConnectionStore,
);
uniqueConnectionService.count('connection1');
uniqueConnectionService.count('connection2');
uniqueConnectionService.count({
connectionId: 'connection1',
type: 'backend',
});
uniqueConnectionService.count({
connectionId: 'connection2',
type: 'backend',
});
await uniqueConnectionService.sync();
uniqueConnectionService.count('connection1');
uniqueConnectionService.count('connection3');
uniqueConnectionService.count({
connectionId: 'connection1',
type: 'backend',
});
uniqueConnectionService.count({
connectionId: 'connection3',
type: 'backend',
});
const stats = await uniqueConnectionReadModel.getStats();
expect(stats).toEqual({ previous: 0, current: 3 });
expect(stats).toEqual({
previous: 0,
current: 3,
previousBackend: 0,
currentBackend: 3,
previousFrontend: 0,
currentFrontend: 0,
});
});
test('sync to existing current bucket from another service', async () => {
@ -109,16 +166,35 @@ test('sync to existing current bucket from another service', async () => {
uniqueConnectionStore,
);
uniqueConnectionService1.count('connection1');
uniqueConnectionService1.count('connection2');
uniqueConnectionService1.count({
connectionId: 'connection1',
type: 'backend',
});
uniqueConnectionService1.count({
connectionId: 'connection2',
type: 'backend',
});
await uniqueConnectionService1.sync();
uniqueConnectionService2.count('connection1');
uniqueConnectionService2.count('connection3');
uniqueConnectionService2.count({
connectionId: 'connection1',
type: 'backend',
});
uniqueConnectionService2.count({
connectionId: 'connection3',
type: 'backend',
});
await uniqueConnectionService2.sync();
const stats = await uniqueConnectionReadModel.getStats();
expect(stats).toEqual({ previous: 0, current: 3 });
expect(stats).toEqual({
previous: 0,
current: 3,
previousBackend: 0,
currentBackend: 3,
previousFrontend: 0,
currentFrontend: 0,
});
});
test('sync to existing previous bucket from another service', async () => {
@ -141,16 +217,35 @@ test('sync to existing previous bucket from another service', async () => {
config,
);
uniqueConnectionService1.count('connection1');
uniqueConnectionService1.count('connection2');
uniqueConnectionService1.count({
connectionId: 'connection1',
type: 'backend',
});
uniqueConnectionService1.count({
connectionId: 'connection2',
type: 'backend',
});
await uniqueConnectionService1.sync(addHours(new Date(), 1));
uniqueConnectionService2.count('connection1');
uniqueConnectionService2.count('connection3');
uniqueConnectionService2.count({
connectionId: 'connection1',
type: 'backend',
});
uniqueConnectionService2.count({
connectionId: 'connection3',
type: 'backend',
});
await uniqueConnectionService2.sync(addHours(new Date(), 1));
const stats = await uniqueConnectionReadModel.getStats();
expect(stats).toEqual({ previous: 3, current: 0 });
expect(stats).toEqual({
previous: 3,
current: 0,
previousBackend: 3,
currentBackend: 0,
previousFrontend: 0,
currentFrontend: 0,
});
});
test('populate previous and current', async () => {
@ -165,20 +260,94 @@ test('populate previous and current', async () => {
uniqueConnectionStore,
);
uniqueConnectionService.count('connection1');
uniqueConnectionService.count('connection2');
uniqueConnectionService.count({
connectionId: 'connection1',
type: 'backend',
});
uniqueConnectionService.count({
connectionId: 'connection2',
type: 'backend',
});
await uniqueConnectionService.sync();
await uniqueConnectionService.sync();
uniqueConnectionService.count('connection3');
uniqueConnectionService.count({
connectionId: 'connection3',
type: 'backend',
});
await uniqueConnectionService.sync(addHours(new Date(), 1));
await uniqueConnectionService.sync(addHours(new Date(), 1)); // deliberate duplicate call
uniqueConnectionService.count('connection3');
uniqueConnectionService.count('connection4');
uniqueConnectionService.count({
connectionId: 'connection3',
type: 'backend',
});
uniqueConnectionService.count({
connectionId: 'connection4',
type: 'backend',
});
await uniqueConnectionService.sync(addHours(new Date(), 1));
await uniqueConnectionService.sync(addHours(new Date(), 1)); // deliberate duplicate call
const stats = await uniqueConnectionReadModel.getStats();
expect(stats).toEqual({ previous: 3, current: 2 });
expect(stats).toEqual({
previous: 3,
current: 2,
previousBackend: 3,
currentBackend: 2,
previousFrontend: 0,
currentFrontend: 0,
});
});
test('populate all buckets', async () => {
const eventBus = new EventEmitter();
const config = { flagResolver: alwaysOnFlagResolver, getLogger, eventBus };
const uniqueConnectionStore = new FakeUniqueConnectionStore();
const uniqueConnectionService = new UniqueConnectionService(
{ uniqueConnectionStore },
config,
);
const uniqueConnectionReadModel = new UniqueConnectionReadModel(
uniqueConnectionStore,
);
uniqueConnectionService.count({
connectionId: 'connection1',
type: 'backend',
});
uniqueConnectionService.count({
connectionId: 'connection2',
type: 'frontend',
});
await uniqueConnectionService.sync();
await uniqueConnectionService.sync();
uniqueConnectionService.count({
connectionId: 'connection3',
type: 'backend',
});
await uniqueConnectionService.sync(addHours(new Date(), 1));
await uniqueConnectionService.sync(addHours(new Date(), 1)); // deliberate duplicate call
uniqueConnectionService.count({
connectionId: 'connection3',
type: 'backend',
});
uniqueConnectionService.count({
connectionId: 'connection4',
type: 'frontend',
});
await uniqueConnectionService.sync(addHours(new Date(), 1));
await uniqueConnectionService.sync(addHours(new Date(), 1)); // deliberate duplicate call
const stats = await uniqueConnectionReadModel.getStats();
expect(stats).toEqual({
previous: 3,
current: 2,
previousBackend: 2,
currentBackend: 1,
previousFrontend: 1,
currentFrontend: 1,
});
});

View File

@ -1,7 +1,10 @@
import type { IUnleashConfig } from '../../types/option';
import type { IFlagResolver, IUnleashStores } from '../../types';
import type { Logger } from '../../logger';
import type { IUniqueConnectionStore } from './unique-connection-store-type';
import type {
BucketId,
IUniqueConnectionStore,
} from './unique-connection-store-type';
import HyperLogLog from 'hyperloglog-lite';
import type EventEmitter from 'events';
import { SDK_CONNECTION_ID_RECEIVED } from '../../metric-events';
@ -20,6 +23,10 @@ export class UniqueConnectionService {
private hll = HyperLogLog(REGISTERS_EXPONENT);
private backendHll = HyperLogLog(REGISTERS_EXPONENT);
private frontendHll = HyperLogLog(REGISTERS_EXPONENT);
constructor(
{
uniqueConnectionStore,
@ -37,54 +44,106 @@ export class UniqueConnectionService {
this.eventBus.on(SDK_CONNECTION_ID_RECEIVED, this.count.bind(this));
}
count(connectionId: string) {
count({
connectionId,
type,
}: { connectionId: string; type: 'frontend' | 'backend' }) {
if (!this.flagResolver.isEnabled('uniqueSdkTracking')) return;
this.hll.add(HyperLogLog.hash(connectionId));
const value = HyperLogLog.hash(connectionId);
this.hll.add(value);
if (type === 'frontend') {
this.frontendHll.add(value);
} else if (type === 'backend') {
this.backendHll.add(value);
}
}
async sync(currentTime = new Date()): Promise<void> {
if (!this.flagResolver.isEnabled('uniqueSdkTracking')) return;
const currentHour = currentTime.getHours();
const currentBucket = await this.uniqueConnectionStore.get('current');
await this.syncBuckets(currentTime, 'current', 'previous');
await this.syncBuckets(
currentTime,
'currentBackend',
'previousBackend',
);
await this.syncBuckets(
currentTime,
'currentFrontend',
'previousFrontend',
);
if (this.activeHour !== currentHour) {
this.activeHour = currentHour;
}
}
private resetHll(bucketId: BucketId) {
if (bucketId.toLowerCase().includes('frontend')) {
this.frontendHll = HyperLogLog(REGISTERS_EXPONENT);
} else if (bucketId.toLowerCase().includes('backend')) {
this.backendHll = HyperLogLog(REGISTERS_EXPONENT);
} else {
this.hll = HyperLogLog(REGISTERS_EXPONENT);
}
}
private getHll(bucketId: BucketId) {
if (bucketId.toLowerCase().includes('frontend')) {
return this.frontendHll;
} else if (bucketId.toLowerCase().includes('backend')) {
return this.backendHll;
} else {
return this.hll;
}
}
private async syncBuckets(
currentTime: Date,
current: BucketId,
previous: BucketId,
): Promise<void> {
const currentHour = currentTime.getHours();
const currentBucket = await this.uniqueConnectionStore.get(current);
if (this.activeHour !== currentHour && currentBucket) {
if (currentBucket.updatedAt.getHours() < currentHour) {
this.hll.merge({
this.getHll(current).merge({
n: REGISTERS_EXPONENT,
buckets: currentBucket.hll,
});
await this.uniqueConnectionStore.insert({
hll: this.hll.output().buckets,
id: 'previous',
hll: this.getHll(current).output().buckets,
id: previous,
});
} else {
const previousBucket =
await this.uniqueConnectionStore.get('previous');
await this.uniqueConnectionStore.get(previous);
if (previousBucket) {
this.hll.merge({
this.getHll(current).merge({
n: REGISTERS_EXPONENT,
buckets: previousBucket.hll,
});
}
await this.uniqueConnectionStore.insert({
hll: this.hll.output().buckets,
id: 'previous',
hll: this.getHll(current).output().buckets,
id: previous,
});
}
this.activeHour = currentHour;
this.hll = HyperLogLog(REGISTERS_EXPONENT);
this.resetHll(current);
} else if (currentBucket) {
this.hll.merge({
this.getHll(current).merge({
n: REGISTERS_EXPONENT,
buckets: currentBucket.hll,
});
}
await this.uniqueConnectionStore.insert({
hll: this.hll.output().buckets,
id: 'current',
hll: this.getHll(current).output().buckets,
id: current,
});
}
}

View File

@ -1,14 +1,22 @@
export type UniqueConnections = {
hll: Buffer;
id: 'current' | 'previous';
id: BucketId;
};
export type TimedUniqueConnections = UniqueConnections & {
updatedAt: Date;
};
export type BucketId =
| 'current'
| 'previous'
| 'currentBackend'
| 'previousBackend'
| 'currentFrontend'
| 'previousFrontend';
export interface IUniqueConnectionStore {
insert(uniqueConnections: UniqueConnections): Promise<void>;
get(id: 'current' | 'previous'): Promise<TimedUniqueConnections | null>;
get(id: BucketId): Promise<TimedUniqueConnections | null>;
deleteAll(): Promise<void>;
}

View File

@ -276,11 +276,35 @@ export function registerPrometheusMetrics(
if (flagResolver.isEnabled('uniqueSdkTracking')) {
return stores.uniqueConnectionReadModel.getStats();
}
return Promise.resolve({ current: 0, previous: 0 });
return Promise.resolve({ previous: 0 });
},
map: (result) => ({ value: result.previous }),
});
dbMetrics.registerGaugeDbMetric({
name: 'unique_backend_sdk_connections_total',
help: 'The number of unique backend SDK connections for the full previous hour across all instances. Available only for SDKs reporting `unleash-x-connection-id`',
query: () => {
if (flagResolver.isEnabled('uniqueSdkTracking')) {
return stores.uniqueConnectionReadModel.getStats();
}
return Promise.resolve({ previousBackend: 0 });
},
map: (result) => ({ value: result.previousBackend }),
});
dbMetrics.registerGaugeDbMetric({
name: 'unique_frontend_sdk_connections_total',
help: 'The number of unique frontend SDK connections for the full previous hour across all instances. Available only for SDKs reporting `unleash-x-connection-id`',
query: () => {
if (flagResolver.isEnabled('uniqueSdkTracking')) {
return stores.uniqueConnectionReadModel.getStats();
}
return Promise.resolve({ previousFrontend: 0 });
},
map: (result) => ({ value: result.previousFrontend }),
});
const featureTogglesArchivedTotal = createGauge({
name: 'feature_toggles_archived_total',
help: 'Number of archived feature flags',

View File

@ -69,8 +69,19 @@ export function responseTimeMetrics(
if (flagResolver.isEnabled('uniqueSdkTracking')) {
const connectionId =
req.headers['x-unleash-connection-id'] ||
`${req.headers['unleash-instanceid']}${req.ip}`;
eventBus.emit(SDK_CONNECTION_ID_RECEIVED, connectionId);
req.headers['unleash-instanceid'];
if (req.url.includes('/api/client') && connectionId) {
eventBus.emit(SDK_CONNECTION_ID_RECEIVED, {
connectionId,
type: 'backend',
});
}
if (req.url.includes('/api/frontend') && connectionId) {
eventBus.emit(SDK_CONNECTION_ID_RECEIVED, {
connectionId,
type: 'frontend',
});
}
}
const timingInfo = {