From cbe0ac475c94b0e9c336da3f4855a649afa17d34 Mon Sep 17 00:00:00 2001 From: Mateusz Kwasniewski Date: Wed, 29 Jan 2025 13:31:37 +0100 Subject: [PATCH] feat: separate frontend backend counting (#9167) --- .../fake-unique-connection-store.ts | 3 +- .../unique-connection-read-model-type.ts | 9 +- .../unique-connection-read-model.ts | 50 +++- .../unique-connection-service.test.ts | 231 +++++++++++++++--- .../unique-connection-service.ts | 91 +++++-- .../unique-connection-store-type.ts | 12 +- src/lib/metrics.ts | 26 +- src/lib/middleware/response-time-metrics.ts | 15 +- 8 files changed, 381 insertions(+), 56 deletions(-) diff --git a/src/lib/features/unique-connection/fake-unique-connection-store.ts b/src/lib/features/unique-connection/fake-unique-connection-store.ts index cad87bc227..fbdfe7f2bc 100644 --- a/src/lib/features/unique-connection/fake-unique-connection-store.ts +++ b/src/lib/features/unique-connection/fake-unique-connection-store.ts @@ -1,5 +1,6 @@ import type { IUniqueConnectionStore } from '../../types'; import type { + BucketId, TimedUniqueConnections, UniqueConnections, } from './unique-connection-store-type'; @@ -16,7 +17,7 @@ export class FakeUniqueConnectionStore implements IUniqueConnectionStore { } async get( - id: 'current' | 'previous', + id: BucketId, ): Promise<(UniqueConnections & { updatedAt: Date }) | null> { return this.uniqueConnectionsRecord[id] || null; } diff --git a/src/lib/features/unique-connection/unique-connection-read-model-type.ts b/src/lib/features/unique-connection/unique-connection-read-model-type.ts index 480181591f..72202b8151 100644 --- a/src/lib/features/unique-connection/unique-connection-read-model-type.ts +++ b/src/lib/features/unique-connection/unique-connection-read-model-type.ts @@ -1,3 +1,10 @@ export interface IUniqueConnectionReadModel { - getStats(): Promise<{ previous: number; current: number }>; + getStats(): Promise<{ + previous: number; + current: number; + previousBackend: number; + currentBackend: number; + previousFrontend: number; + currentFrontend: number; + }>; } diff --git a/src/lib/features/unique-connection/unique-connection-read-model.ts b/src/lib/features/unique-connection/unique-connection-read-model.ts index ec63a43dbe..a2fb1b272f 100644 --- a/src/lib/features/unique-connection/unique-connection-read-model.ts +++ b/src/lib/features/unique-connection/unique-connection-read-model.ts @@ -14,9 +14,20 @@ export class UniqueConnectionReadModel implements IUniqueConnectionReadModel { } async getStats() { - const [previous, current] = await Promise.all([ + const [ + previous, + current, + previousFrontend, + currentFrontend, + previousBackend, + currentBackend, + ] = await Promise.all([ this.uniqueConnectionStore.get('previous'), this.uniqueConnectionStore.get('current'), + this.uniqueConnectionStore.get('previousFrontend'), + this.uniqueConnectionStore.get('currentFrontend'), + this.uniqueConnectionStore.get('previousBackend'), + this.uniqueConnectionStore.get('currentBackend'), ]); const previousHll = HyperLogLog(REGISTERS_EXPONENT); if (previous) { @@ -26,6 +37,41 @@ export class UniqueConnectionReadModel implements IUniqueConnectionReadModel { if (current) { currentHll.merge({ n: REGISTERS_EXPONENT, buckets: current.hll }); } - return { previous: previousHll.count(), current: currentHll.count() }; + const previousFrontendHll = HyperLogLog(REGISTERS_EXPONENT); + if (previousFrontend) { + previousFrontendHll.merge({ + n: REGISTERS_EXPONENT, + buckets: previousFrontend.hll, + }); + } + const currentFrontendHll = HyperLogLog(REGISTERS_EXPONENT); + if (currentFrontend) { + currentFrontendHll.merge({ + n: REGISTERS_EXPONENT, + buckets: currentFrontend.hll, + }); + } + const previousBackendHll = HyperLogLog(REGISTERS_EXPONENT); + if (previousBackend) { + previousBackendHll.merge({ + n: REGISTERS_EXPONENT, + buckets: previousBackend.hll, + }); + } + const currentBackendHll = HyperLogLog(REGISTERS_EXPONENT); + if (currentBackend) { + currentBackendHll.merge({ + n: REGISTERS_EXPONENT, + buckets: currentBackend.hll, + }); + } + return { + previous: previousHll.count(), + current: currentHll.count(), + previousFrontend: previousFrontendHll.count(), + currentFrontend: currentFrontendHll.count(), + previousBackend: previousBackendHll.count(), + currentBackend: currentBackendHll.count(), + }; } } diff --git a/src/lib/features/unique-connection/unique-connection-service.test.ts b/src/lib/features/unique-connection/unique-connection-service.test.ts index 572e510a6c..88c0641aea 100644 --- a/src/lib/features/unique-connection/unique-connection-service.test.ts +++ b/src/lib/features/unique-connection/unique-connection-service.test.ts @@ -26,16 +26,38 @@ test('sync first current bucket', async () => { ); uniqueConnectionService.listen(); - eventBus.emit(SDK_CONNECTION_ID_RECEIVED, 'connection1'); - eventBus.emit(SDK_CONNECTION_ID_RECEIVED, 'connection1'); - eventBus.emit(SDK_CONNECTION_ID_RECEIVED, 'connection2'); - eventBus.emit(SDK_CONNECTION_ID_RECEIVED, 'connection2'); - eventBus.emit(SDK_CONNECTION_ID_RECEIVED, 'connection2'); + eventBus.emit(SDK_CONNECTION_ID_RECEIVED, { + connectionId: 'connection1', + type: 'backend', + }); + eventBus.emit(SDK_CONNECTION_ID_RECEIVED, { + connectionId: 'connection1', + type: 'backend', + }); + eventBus.emit(SDK_CONNECTION_ID_RECEIVED, { + connectionId: 'connection2', + type: 'backend', + }); + eventBus.emit(SDK_CONNECTION_ID_RECEIVED, { + connectionId: 'connection2', + type: 'backend', + }); + eventBus.emit(SDK_CONNECTION_ID_RECEIVED, { + connectionId: 'connection2', + type: 'backend', + }); await uniqueConnectionService.sync(); const stats = await uniqueConnectionReadModel.getStats(); - expect(stats).toEqual({ previous: 0, current: 2 }); + expect(stats).toEqual({ + previous: 0, + current: 2, + previousBackend: 0, + currentBackend: 2, + previousFrontend: 0, + currentFrontend: 0, + }); }); test('sync first previous bucket', async () => { @@ -51,17 +73,33 @@ test('sync first previous bucket', async () => { uniqueConnectionStore, ); - eventBus.emit(SDK_CONNECTION_ID_RECEIVED, 'connection1'); - eventBus.emit(SDK_CONNECTION_ID_RECEIVED, 'connection2'); + eventBus.emit(SDK_CONNECTION_ID_RECEIVED, { + connectionId: 'connection1', + type: 'backend', + }); + eventBus.emit(SDK_CONNECTION_ID_RECEIVED, { + connectionId: 'connection2', + type: 'backend', + }); await uniqueConnectionService.sync(); - eventBus.emit(SDK_CONNECTION_ID_RECEIVED, 'connection3'); + eventBus.emit(SDK_CONNECTION_ID_RECEIVED, { + connectionId: 'connection3', + type: 'backend', + }); await uniqueConnectionService.sync(addHours(new Date(), 1)); const stats = await uniqueConnectionReadModel.getStats(); - expect(stats).toEqual({ previous: 3, current: 0 }); + expect(stats).toEqual({ + previous: 3, + current: 0, + previousBackend: 3, + currentBackend: 0, + previousFrontend: 0, + currentFrontend: 0, + }); }); test('sync to existing current bucket from the same service', async () => { @@ -77,16 +115,35 @@ test('sync to existing current bucket from the same service', async () => { uniqueConnectionStore, ); - uniqueConnectionService.count('connection1'); - uniqueConnectionService.count('connection2'); + uniqueConnectionService.count({ + connectionId: 'connection1', + type: 'backend', + }); + uniqueConnectionService.count({ + connectionId: 'connection2', + type: 'backend', + }); await uniqueConnectionService.sync(); - uniqueConnectionService.count('connection1'); - uniqueConnectionService.count('connection3'); + uniqueConnectionService.count({ + connectionId: 'connection1', + type: 'backend', + }); + uniqueConnectionService.count({ + connectionId: 'connection3', + type: 'backend', + }); const stats = await uniqueConnectionReadModel.getStats(); - expect(stats).toEqual({ previous: 0, current: 3 }); + expect(stats).toEqual({ + previous: 0, + current: 3, + previousBackend: 0, + currentBackend: 3, + previousFrontend: 0, + currentFrontend: 0, + }); }); test('sync to existing current bucket from another service', async () => { @@ -109,16 +166,35 @@ test('sync to existing current bucket from another service', async () => { uniqueConnectionStore, ); - uniqueConnectionService1.count('connection1'); - uniqueConnectionService1.count('connection2'); + uniqueConnectionService1.count({ + connectionId: 'connection1', + type: 'backend', + }); + uniqueConnectionService1.count({ + connectionId: 'connection2', + type: 'backend', + }); await uniqueConnectionService1.sync(); - uniqueConnectionService2.count('connection1'); - uniqueConnectionService2.count('connection3'); + uniqueConnectionService2.count({ + connectionId: 'connection1', + type: 'backend', + }); + uniqueConnectionService2.count({ + connectionId: 'connection3', + type: 'backend', + }); await uniqueConnectionService2.sync(); const stats = await uniqueConnectionReadModel.getStats(); - expect(stats).toEqual({ previous: 0, current: 3 }); + expect(stats).toEqual({ + previous: 0, + current: 3, + previousBackend: 0, + currentBackend: 3, + previousFrontend: 0, + currentFrontend: 0, + }); }); test('sync to existing previous bucket from another service', async () => { @@ -141,16 +217,35 @@ test('sync to existing previous bucket from another service', async () => { config, ); - uniqueConnectionService1.count('connection1'); - uniqueConnectionService1.count('connection2'); + uniqueConnectionService1.count({ + connectionId: 'connection1', + type: 'backend', + }); + uniqueConnectionService1.count({ + connectionId: 'connection2', + type: 'backend', + }); await uniqueConnectionService1.sync(addHours(new Date(), 1)); - uniqueConnectionService2.count('connection1'); - uniqueConnectionService2.count('connection3'); + uniqueConnectionService2.count({ + connectionId: 'connection1', + type: 'backend', + }); + uniqueConnectionService2.count({ + connectionId: 'connection3', + type: 'backend', + }); await uniqueConnectionService2.sync(addHours(new Date(), 1)); const stats = await uniqueConnectionReadModel.getStats(); - expect(stats).toEqual({ previous: 3, current: 0 }); + expect(stats).toEqual({ + previous: 3, + current: 0, + previousBackend: 3, + currentBackend: 0, + previousFrontend: 0, + currentFrontend: 0, + }); }); test('populate previous and current', async () => { @@ -165,20 +260,94 @@ test('populate previous and current', async () => { uniqueConnectionStore, ); - uniqueConnectionService.count('connection1'); - uniqueConnectionService.count('connection2'); + uniqueConnectionService.count({ + connectionId: 'connection1', + type: 'backend', + }); + uniqueConnectionService.count({ + connectionId: 'connection2', + type: 'backend', + }); await uniqueConnectionService.sync(); await uniqueConnectionService.sync(); - uniqueConnectionService.count('connection3'); + uniqueConnectionService.count({ + connectionId: 'connection3', + type: 'backend', + }); await uniqueConnectionService.sync(addHours(new Date(), 1)); await uniqueConnectionService.sync(addHours(new Date(), 1)); // deliberate duplicate call - uniqueConnectionService.count('connection3'); - uniqueConnectionService.count('connection4'); + uniqueConnectionService.count({ + connectionId: 'connection3', + type: 'backend', + }); + uniqueConnectionService.count({ + connectionId: 'connection4', + type: 'backend', + }); await uniqueConnectionService.sync(addHours(new Date(), 1)); await uniqueConnectionService.sync(addHours(new Date(), 1)); // deliberate duplicate call const stats = await uniqueConnectionReadModel.getStats(); - expect(stats).toEqual({ previous: 3, current: 2 }); + expect(stats).toEqual({ + previous: 3, + current: 2, + previousBackend: 3, + currentBackend: 2, + previousFrontend: 0, + currentFrontend: 0, + }); +}); + +test('populate all buckets', async () => { + const eventBus = new EventEmitter(); + const config = { flagResolver: alwaysOnFlagResolver, getLogger, eventBus }; + const uniqueConnectionStore = new FakeUniqueConnectionStore(); + const uniqueConnectionService = new UniqueConnectionService( + { uniqueConnectionStore }, + config, + ); + const uniqueConnectionReadModel = new UniqueConnectionReadModel( + uniqueConnectionStore, + ); + + uniqueConnectionService.count({ + connectionId: 'connection1', + type: 'backend', + }); + uniqueConnectionService.count({ + connectionId: 'connection2', + type: 'frontend', + }); + await uniqueConnectionService.sync(); + await uniqueConnectionService.sync(); + + uniqueConnectionService.count({ + connectionId: 'connection3', + type: 'backend', + }); + await uniqueConnectionService.sync(addHours(new Date(), 1)); + await uniqueConnectionService.sync(addHours(new Date(), 1)); // deliberate duplicate call + + uniqueConnectionService.count({ + connectionId: 'connection3', + type: 'backend', + }); + uniqueConnectionService.count({ + connectionId: 'connection4', + type: 'frontend', + }); + await uniqueConnectionService.sync(addHours(new Date(), 1)); + await uniqueConnectionService.sync(addHours(new Date(), 1)); // deliberate duplicate call + + const stats = await uniqueConnectionReadModel.getStats(); + expect(stats).toEqual({ + previous: 3, + current: 2, + previousBackend: 2, + currentBackend: 1, + previousFrontend: 1, + currentFrontend: 1, + }); }); diff --git a/src/lib/features/unique-connection/unique-connection-service.ts b/src/lib/features/unique-connection/unique-connection-service.ts index c8ad19b8cf..435c77102e 100644 --- a/src/lib/features/unique-connection/unique-connection-service.ts +++ b/src/lib/features/unique-connection/unique-connection-service.ts @@ -1,7 +1,10 @@ import type { IUnleashConfig } from '../../types/option'; import type { IFlagResolver, IUnleashStores } from '../../types'; import type { Logger } from '../../logger'; -import type { IUniqueConnectionStore } from './unique-connection-store-type'; +import type { + BucketId, + IUniqueConnectionStore, +} from './unique-connection-store-type'; import HyperLogLog from 'hyperloglog-lite'; import type EventEmitter from 'events'; import { SDK_CONNECTION_ID_RECEIVED } from '../../metric-events'; @@ -20,6 +23,10 @@ export class UniqueConnectionService { private hll = HyperLogLog(REGISTERS_EXPONENT); + private backendHll = HyperLogLog(REGISTERS_EXPONENT); + + private frontendHll = HyperLogLog(REGISTERS_EXPONENT); + constructor( { uniqueConnectionStore, @@ -37,54 +44,106 @@ export class UniqueConnectionService { this.eventBus.on(SDK_CONNECTION_ID_RECEIVED, this.count.bind(this)); } - count(connectionId: string) { + count({ + connectionId, + type, + }: { connectionId: string; type: 'frontend' | 'backend' }) { if (!this.flagResolver.isEnabled('uniqueSdkTracking')) return; - this.hll.add(HyperLogLog.hash(connectionId)); + const value = HyperLogLog.hash(connectionId); + this.hll.add(value); + if (type === 'frontend') { + this.frontendHll.add(value); + } else if (type === 'backend') { + this.backendHll.add(value); + } } async sync(currentTime = new Date()): Promise { if (!this.flagResolver.isEnabled('uniqueSdkTracking')) return; const currentHour = currentTime.getHours(); - const currentBucket = await this.uniqueConnectionStore.get('current'); + + await this.syncBuckets(currentTime, 'current', 'previous'); + await this.syncBuckets( + currentTime, + 'currentBackend', + 'previousBackend', + ); + await this.syncBuckets( + currentTime, + 'currentFrontend', + 'previousFrontend', + ); + + if (this.activeHour !== currentHour) { + this.activeHour = currentHour; + } + } + + private resetHll(bucketId: BucketId) { + if (bucketId.toLowerCase().includes('frontend')) { + this.frontendHll = HyperLogLog(REGISTERS_EXPONENT); + } else if (bucketId.toLowerCase().includes('backend')) { + this.backendHll = HyperLogLog(REGISTERS_EXPONENT); + } else { + this.hll = HyperLogLog(REGISTERS_EXPONENT); + } + } + + private getHll(bucketId: BucketId) { + if (bucketId.toLowerCase().includes('frontend')) { + return this.frontendHll; + } else if (bucketId.toLowerCase().includes('backend')) { + return this.backendHll; + } else { + return this.hll; + } + } + + private async syncBuckets( + currentTime: Date, + current: BucketId, + previous: BucketId, + ): Promise { + const currentHour = currentTime.getHours(); + const currentBucket = await this.uniqueConnectionStore.get(current); if (this.activeHour !== currentHour && currentBucket) { if (currentBucket.updatedAt.getHours() < currentHour) { - this.hll.merge({ + this.getHll(current).merge({ n: REGISTERS_EXPONENT, buckets: currentBucket.hll, }); await this.uniqueConnectionStore.insert({ - hll: this.hll.output().buckets, - id: 'previous', + hll: this.getHll(current).output().buckets, + id: previous, }); } else { const previousBucket = - await this.uniqueConnectionStore.get('previous'); + await this.uniqueConnectionStore.get(previous); if (previousBucket) { - this.hll.merge({ + this.getHll(current).merge({ n: REGISTERS_EXPONENT, buckets: previousBucket.hll, }); } await this.uniqueConnectionStore.insert({ - hll: this.hll.output().buckets, - id: 'previous', + hll: this.getHll(current).output().buckets, + id: previous, }); } - this.activeHour = currentHour; - this.hll = HyperLogLog(REGISTERS_EXPONENT); + this.resetHll(current); } else if (currentBucket) { - this.hll.merge({ + this.getHll(current).merge({ n: REGISTERS_EXPONENT, buckets: currentBucket.hll, }); } await this.uniqueConnectionStore.insert({ - hll: this.hll.output().buckets, - id: 'current', + hll: this.getHll(current).output().buckets, + id: current, }); } } diff --git a/src/lib/features/unique-connection/unique-connection-store-type.ts b/src/lib/features/unique-connection/unique-connection-store-type.ts index e691a794f8..fe46ca6416 100644 --- a/src/lib/features/unique-connection/unique-connection-store-type.ts +++ b/src/lib/features/unique-connection/unique-connection-store-type.ts @@ -1,14 +1,22 @@ export type UniqueConnections = { hll: Buffer; - id: 'current' | 'previous'; + id: BucketId; }; export type TimedUniqueConnections = UniqueConnections & { updatedAt: Date; }; +export type BucketId = + | 'current' + | 'previous' + | 'currentBackend' + | 'previousBackend' + | 'currentFrontend' + | 'previousFrontend'; + export interface IUniqueConnectionStore { insert(uniqueConnections: UniqueConnections): Promise; - get(id: 'current' | 'previous'): Promise; + get(id: BucketId): Promise; deleteAll(): Promise; } diff --git a/src/lib/metrics.ts b/src/lib/metrics.ts index f34d5fb589..3fb0969a69 100644 --- a/src/lib/metrics.ts +++ b/src/lib/metrics.ts @@ -276,11 +276,35 @@ export function registerPrometheusMetrics( if (flagResolver.isEnabled('uniqueSdkTracking')) { return stores.uniqueConnectionReadModel.getStats(); } - return Promise.resolve({ current: 0, previous: 0 }); + return Promise.resolve({ previous: 0 }); }, map: (result) => ({ value: result.previous }), }); + dbMetrics.registerGaugeDbMetric({ + name: 'unique_backend_sdk_connections_total', + help: 'The number of unique backend SDK connections for the full previous hour across all instances. Available only for SDKs reporting `unleash-x-connection-id`', + query: () => { + if (flagResolver.isEnabled('uniqueSdkTracking')) { + return stores.uniqueConnectionReadModel.getStats(); + } + return Promise.resolve({ previousBackend: 0 }); + }, + map: (result) => ({ value: result.previousBackend }), + }); + + dbMetrics.registerGaugeDbMetric({ + name: 'unique_frontend_sdk_connections_total', + help: 'The number of unique frontend SDK connections for the full previous hour across all instances. Available only for SDKs reporting `unleash-x-connection-id`', + query: () => { + if (flagResolver.isEnabled('uniqueSdkTracking')) { + return stores.uniqueConnectionReadModel.getStats(); + } + return Promise.resolve({ previousFrontend: 0 }); + }, + map: (result) => ({ value: result.previousFrontend }), + }); + const featureTogglesArchivedTotal = createGauge({ name: 'feature_toggles_archived_total', help: 'Number of archived feature flags', diff --git a/src/lib/middleware/response-time-metrics.ts b/src/lib/middleware/response-time-metrics.ts index cae5a48c8a..c7dc5fe9d5 100644 --- a/src/lib/middleware/response-time-metrics.ts +++ b/src/lib/middleware/response-time-metrics.ts @@ -69,8 +69,19 @@ export function responseTimeMetrics( if (flagResolver.isEnabled('uniqueSdkTracking')) { const connectionId = req.headers['x-unleash-connection-id'] || - `${req.headers['unleash-instanceid']}${req.ip}`; - eventBus.emit(SDK_CONNECTION_ID_RECEIVED, connectionId); + req.headers['unleash-instanceid']; + if (req.url.includes('/api/client') && connectionId) { + eventBus.emit(SDK_CONNECTION_ID_RECEIVED, { + connectionId, + type: 'backend', + }); + } + if (req.url.includes('/api/frontend') && connectionId) { + eventBus.emit(SDK_CONNECTION_ID_RECEIVED, { + connectionId, + type: 'frontend', + }); + } } const timingInfo = {