diff --git a/src/lib/db/index.ts b/src/lib/db/index.ts index 6d2d41e4e0..d704c7c4a5 100644 --- a/src/lib/db/index.ts +++ b/src/lib/db/index.ts @@ -57,6 +57,7 @@ import { createOnboardingReadModel } from '../features/onboarding/createOnboardi import { UserUnsubscribeStore } from '../features/user-subscriptions/user-unsubscribe-store'; import { UserSubscriptionsReadModel } from '../features/user-subscriptions/user-subscriptions-read-model'; import { UniqueConnectionStore } from '../features/unique-connection/unique-connection-store'; +import { UniqueConnectionReadModel } from '../features/unique-connection/unique-connection-read-model'; export const createStores = ( config: IUnleashConfig, @@ -187,6 +188,9 @@ export const createStores = ( userUnsubscribeStore: new UserUnsubscribeStore(db), userSubscriptionsReadModel: new UserSubscriptionsReadModel(db), uniqueConnectionStore: new UniqueConnectionStore(db), + uniqueConnectionReadModel: new UniqueConnectionReadModel( + new UniqueConnectionStore(db), + ), }; }; diff --git a/src/lib/features/unique-connection/hyperloglog-config.ts b/src/lib/features/unique-connection/hyperloglog-config.ts new file mode 100644 index 0000000000..e853a1ed15 --- /dev/null +++ b/src/lib/features/unique-connection/hyperloglog-config.ts @@ -0,0 +1,2 @@ +// HyperLogLog will create 2^n registers +export const REGISTERS_EXPONENT = 12; diff --git a/src/lib/features/unique-connection/unique-connection-read-model-type.ts b/src/lib/features/unique-connection/unique-connection-read-model-type.ts new file mode 100644 index 0000000000..480181591f --- /dev/null +++ b/src/lib/features/unique-connection/unique-connection-read-model-type.ts @@ -0,0 +1,3 @@ +export interface IUniqueConnectionReadModel { + getStats(): Promise<{ previous: number; current: number }>; +} diff --git a/src/lib/features/unique-connection/unique-connection-read-model.ts b/src/lib/features/unique-connection/unique-connection-read-model.ts new file mode 100644 index 0000000000..ec63a43dbe --- /dev/null +++ b/src/lib/features/unique-connection/unique-connection-read-model.ts @@ -0,0 +1,31 @@ +import type { + IUniqueConnectionReadModel, + IUniqueConnectionStore, +} from '../../types'; + +import HyperLogLog from 'hyperloglog-lite'; +import { REGISTERS_EXPONENT } from './hyperloglog-config'; + +export class UniqueConnectionReadModel implements IUniqueConnectionReadModel { + private uniqueConnectionStore: IUniqueConnectionStore; + + constructor(uniqueConnectionStore: IUniqueConnectionStore) { + this.uniqueConnectionStore = uniqueConnectionStore; + } + + async getStats() { + const [previous, current] = await Promise.all([ + this.uniqueConnectionStore.get('previous'), + this.uniqueConnectionStore.get('current'), + ]); + const previousHll = HyperLogLog(REGISTERS_EXPONENT); + if (previous) { + previousHll.merge({ n: REGISTERS_EXPONENT, buckets: previous.hll }); + } + const currentHll = HyperLogLog(REGISTERS_EXPONENT); + if (current) { + currentHll.merge({ n: REGISTERS_EXPONENT, buckets: current.hll }); + } + return { previous: previousHll.count(), current: currentHll.count() }; + } +} diff --git a/src/lib/features/unique-connection/unique-connection-service.test.ts b/src/lib/features/unique-connection/unique-connection-service.test.ts index 6ab0676ffa..572e510a6c 100644 --- a/src/lib/features/unique-connection/unique-connection-service.test.ts +++ b/src/lib/features/unique-connection/unique-connection-service.test.ts @@ -5,6 +5,7 @@ import type { IFlagResolver } from '../../types'; import { SDK_CONNECTION_ID_RECEIVED } from '../../metric-events'; import { addHours } from 'date-fns'; import EventEmitter from 'events'; +import { UniqueConnectionReadModel } from './unique-connection-read-model'; const alwaysOnFlagResolver = { isEnabled() { @@ -20,6 +21,9 @@ test('sync first current bucket', async () => { { uniqueConnectionStore }, config, ); + const uniqueConnectionReadModel = new UniqueConnectionReadModel( + uniqueConnectionStore, + ); uniqueConnectionService.listen(); eventBus.emit(SDK_CONNECTION_ID_RECEIVED, 'connection1'); @@ -30,7 +34,7 @@ test('sync first current bucket', async () => { await uniqueConnectionService.sync(); - const stats = await uniqueConnectionService.getStats(); + const stats = await uniqueConnectionReadModel.getStats(); expect(stats).toEqual({ previous: 0, current: 2 }); }); @@ -43,6 +47,9 @@ test('sync first previous bucket', async () => { config, ); uniqueConnectionService.listen(); + const uniqueConnectionReadModel = new UniqueConnectionReadModel( + uniqueConnectionStore, + ); eventBus.emit(SDK_CONNECTION_ID_RECEIVED, 'connection1'); eventBus.emit(SDK_CONNECTION_ID_RECEIVED, 'connection2'); @@ -53,7 +60,7 @@ test('sync first previous bucket', async () => { await uniqueConnectionService.sync(addHours(new Date(), 1)); - const stats = await uniqueConnectionService.getStats(); + const stats = await uniqueConnectionReadModel.getStats(); expect(stats).toEqual({ previous: 3, current: 0 }); }); @@ -66,6 +73,9 @@ test('sync to existing current bucket from the same service', async () => { config, ); uniqueConnectionService.listen(); + const uniqueConnectionReadModel = new UniqueConnectionReadModel( + uniqueConnectionStore, + ); uniqueConnectionService.count('connection1'); uniqueConnectionService.count('connection2'); @@ -75,7 +85,7 @@ test('sync to existing current bucket from the same service', async () => { uniqueConnectionService.count('connection1'); uniqueConnectionService.count('connection3'); - const stats = await uniqueConnectionService.getStats(); + const stats = await uniqueConnectionReadModel.getStats(); expect(stats).toEqual({ previous: 0, current: 3 }); }); @@ -95,6 +105,9 @@ test('sync to existing current bucket from another service', async () => { { uniqueConnectionStore }, config, ); + const uniqueConnectionReadModel = new UniqueConnectionReadModel( + uniqueConnectionStore, + ); uniqueConnectionService1.count('connection1'); uniqueConnectionService1.count('connection2'); @@ -104,10 +117,8 @@ test('sync to existing current bucket from another service', async () => { uniqueConnectionService2.count('connection3'); await uniqueConnectionService2.sync(); - const stats1 = await uniqueConnectionService1.getStats(); - expect(stats1).toEqual({ previous: 0, current: 3 }); - const stats2 = await uniqueConnectionService2.getStats(); - expect(stats2).toEqual({ previous: 0, current: 3 }); + const stats = await uniqueConnectionReadModel.getStats(); + expect(stats).toEqual({ previous: 0, current: 3 }); }); test('sync to existing previous bucket from another service', async () => { @@ -118,6 +129,9 @@ test('sync to existing previous bucket from another service', async () => { eventBus: eventBus, }; const uniqueConnectionStore = new FakeUniqueConnectionStore(); + const uniqueConnectionReadModel = new UniqueConnectionReadModel( + uniqueConnectionStore, + ); const uniqueConnectionService1 = new UniqueConnectionService( { uniqueConnectionStore }, config, @@ -135,10 +149,8 @@ test('sync to existing previous bucket from another service', async () => { uniqueConnectionService2.count('connection3'); await uniqueConnectionService2.sync(addHours(new Date(), 1)); - const stats1 = await uniqueConnectionService1.getStats(); - expect(stats1).toEqual({ previous: 3, current: 0 }); - const stats2 = await uniqueConnectionService2.getStats(); - expect(stats2).toEqual({ previous: 3, current: 0 }); + const stats = await uniqueConnectionReadModel.getStats(); + expect(stats).toEqual({ previous: 3, current: 0 }); }); test('populate previous and current', async () => { @@ -149,6 +161,9 @@ test('populate previous and current', async () => { { uniqueConnectionStore }, config, ); + const uniqueConnectionReadModel = new UniqueConnectionReadModel( + uniqueConnectionStore, + ); uniqueConnectionService.count('connection1'); uniqueConnectionService.count('connection2'); @@ -164,6 +179,6 @@ test('populate previous and current', async () => { await uniqueConnectionService.sync(addHours(new Date(), 1)); await uniqueConnectionService.sync(addHours(new Date(), 1)); // deliberate duplicate call - const stats = await uniqueConnectionService.getStats(); + const stats = await uniqueConnectionReadModel.getStats(); expect(stats).toEqual({ previous: 3, current: 2 }); }); diff --git a/src/lib/features/unique-connection/unique-connection-service.ts b/src/lib/features/unique-connection/unique-connection-service.ts index f49903bd45..c8ad19b8cf 100644 --- a/src/lib/features/unique-connection/unique-connection-service.ts +++ b/src/lib/features/unique-connection/unique-connection-service.ts @@ -5,9 +5,7 @@ import type { IUniqueConnectionStore } from './unique-connection-store-type'; import HyperLogLog from 'hyperloglog-lite'; import type EventEmitter from 'events'; import { SDK_CONNECTION_ID_RECEIVED } from '../../metric-events'; - -// HyperLogLog will create 2^n registers -const n = 12; +import { REGISTERS_EXPONENT } from './hyperloglog-config'; export class UniqueConnectionService { private logger: Logger; @@ -20,7 +18,7 @@ export class UniqueConnectionService { private activeHour: number; - private hll = HyperLogLog(n); + private hll = HyperLogLog(REGISTERS_EXPONENT); constructor( { @@ -44,22 +42,6 @@ export class UniqueConnectionService { this.hll.add(HyperLogLog.hash(connectionId)); } - async getStats() { - const [previous, current] = await Promise.all([ - this.uniqueConnectionStore.get('previous'), - this.uniqueConnectionStore.get('current'), - ]); - const previousHll = HyperLogLog(n); - if (previous) { - previousHll.merge({ n, buckets: previous.hll }); - } - const currentHll = HyperLogLog(n); - if (current) { - currentHll.merge({ n, buckets: current.hll }); - } - return { previous: previousHll.count(), current: currentHll.count() }; - } - async sync(currentTime = new Date()): Promise { if (!this.flagResolver.isEnabled('uniqueSdkTracking')) return; @@ -68,7 +50,10 @@ export class UniqueConnectionService { if (this.activeHour !== currentHour && currentBucket) { if (currentBucket.updatedAt.getHours() < currentHour) { - this.hll.merge({ n, buckets: currentBucket.hll }); + this.hll.merge({ + n: REGISTERS_EXPONENT, + buckets: currentBucket.hll, + }); await this.uniqueConnectionStore.insert({ hll: this.hll.output().buckets, id: 'previous', @@ -77,7 +62,10 @@ export class UniqueConnectionService { const previousBucket = await this.uniqueConnectionStore.get('previous'); if (previousBucket) { - this.hll.merge({ n, buckets: previousBucket.hll }); + this.hll.merge({ + n: REGISTERS_EXPONENT, + buckets: previousBucket.hll, + }); } await this.uniqueConnectionStore.insert({ hll: this.hll.output().buckets, @@ -86,9 +74,12 @@ export class UniqueConnectionService { } this.activeHour = currentHour; - this.hll = HyperLogLog(n); + this.hll = HyperLogLog(REGISTERS_EXPONENT); } else if (currentBucket) { - this.hll.merge({ n, buckets: currentBucket.hll }); + this.hll.merge({ + n: REGISTERS_EXPONENT, + buckets: currentBucket.hll, + }); } await this.uniqueConnectionStore.insert({ diff --git a/src/lib/metrics.ts b/src/lib/metrics.ts index 02b1373336..e504d6e30d 100644 --- a/src/lib/metrics.ts +++ b/src/lib/metrics.ts @@ -269,6 +269,18 @@ export function registerPrometheusMetrics( }, }); + dbMetrics.registerGaugeDbMetric({ + name: 'unique_sdk_connections_total', + help: 'The number of unique SDK connections for the full previous hour across all instances. Available only for SDKs reporting `unleash-x-connection-id`', + query: () => { + if (flagResolver.isEnabled('uniqueSdkTracking')) { + return stores.uniqueConnectionReadModel.getStats(); + } + return Promise.resolve({ current: 0, previous: 0 }); + }, + map: (result) => ({ value: result.previous }), + }); + const featureTogglesArchivedTotal = createGauge({ name: 'feature_toggles_archived_total', help: 'Number of archived feature flags', diff --git a/src/lib/types/stores.ts b/src/lib/types/stores.ts index 710b842d48..baf3768e2b 100644 --- a/src/lib/types/stores.ts +++ b/src/lib/types/stores.ts @@ -54,6 +54,7 @@ import { IOnboardingStore } from '../features/onboarding/onboarding-store-type'; import type { IUserUnsubscribeStore } from '../features/user-subscriptions/user-unsubscribe-store-type'; import type { IUserSubscriptionsReadModel } from '../features/user-subscriptions/user-subscriptions-read-model-type'; import { IUniqueConnectionStore } from '../features/unique-connection/unique-connection-store-type'; +import { IUniqueConnectionReadModel } from '../features/unique-connection/unique-connection-read-model-type'; export interface IUnleashStores { accessStore: IAccessStore; @@ -112,6 +113,7 @@ export interface IUnleashStores { userUnsubscribeStore: IUserUnsubscribeStore; userSubscriptionsReadModel: IUserSubscriptionsReadModel; uniqueConnectionStore: IUniqueConnectionStore; + uniqueConnectionReadModel: IUniqueConnectionReadModel; } export { @@ -168,4 +170,5 @@ export { IOnboardingStore, type IUserSubscriptionsReadModel, IUniqueConnectionStore, + IUniqueConnectionReadModel, }; diff --git a/src/test/fixtures/store.ts b/src/test/fixtures/store.ts index 21038c30f1..d4b0d7b10a 100644 --- a/src/test/fixtures/store.ts +++ b/src/test/fixtures/store.ts @@ -57,6 +57,7 @@ import { createFakeOnboardingReadModel } from '../../lib/features/onboarding/cre import { FakeUserUnsubscribeStore } from '../../lib/features/user-subscriptions/fake-user-unsubscribe-store'; import { FakeUserSubscriptionsReadModel } from '../../lib/features/user-subscriptions/fake-user-subscriptions-read-model'; import { FakeUniqueConnectionStore } from '../../lib/features/unique-connection/fake-unique-connection-store'; +import { UniqueConnectionReadModel } from '../../lib/features/unique-connection/unique-connection-read-model'; const db = { select: () => ({ @@ -65,6 +66,8 @@ const db = { }; const createStores: () => IUnleashStores = () => { + const uniqueConnectionStore = new FakeUniqueConnectionStore(); + return { db, clientApplicationsStore: new FakeClientApplicationsStore(), @@ -122,7 +125,10 @@ const createStores: () => IUnleashStores = () => { onboardingStore: new FakeOnboardingStore(), userUnsubscribeStore: new FakeUserUnsubscribeStore(), userSubscriptionsReadModel: new FakeUserSubscriptionsReadModel(), - uniqueConnectionStore: new FakeUniqueConnectionStore(), + uniqueConnectionStore, + uniqueConnectionReadModel: new UniqueConnectionReadModel( + uniqueConnectionStore, + ), }; };