mirror of
				https://github.com/Unleash/unleash.git
				synced 2025-10-27 11:02:16 +01:00 
			
		
		
		
	feat: unique connection gauge metric (#9089)
This commit is contained in:
		
							parent
							
								
									86bbe62abe
								
							
						
					
					
						commit
						ce73190241
					
				| @ -57,6 +57,7 @@ import { createOnboardingReadModel } from '../features/onboarding/createOnboardi | ||||
| import { UserUnsubscribeStore } from '../features/user-subscriptions/user-unsubscribe-store'; | ||||
| import { UserSubscriptionsReadModel } from '../features/user-subscriptions/user-subscriptions-read-model'; | ||||
| import { UniqueConnectionStore } from '../features/unique-connection/unique-connection-store'; | ||||
| import { UniqueConnectionReadModel } from '../features/unique-connection/unique-connection-read-model'; | ||||
| 
 | ||||
| export const createStores = ( | ||||
|     config: IUnleashConfig, | ||||
| @ -187,6 +188,9 @@ export const createStores = ( | ||||
|         userUnsubscribeStore: new UserUnsubscribeStore(db), | ||||
|         userSubscriptionsReadModel: new UserSubscriptionsReadModel(db), | ||||
|         uniqueConnectionStore: new UniqueConnectionStore(db), | ||||
|         uniqueConnectionReadModel: new UniqueConnectionReadModel( | ||||
|             new UniqueConnectionStore(db), | ||||
|         ), | ||||
|     }; | ||||
| }; | ||||
| 
 | ||||
|  | ||||
							
								
								
									
										2
									
								
								src/lib/features/unique-connection/hyperloglog-config.ts
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										2
									
								
								src/lib/features/unique-connection/hyperloglog-config.ts
									
									
									
									
									
										Normal file
									
								
							| @ -0,0 +1,2 @@ | ||||
| // HyperLogLog will create 2^n registers
 | ||||
| export const REGISTERS_EXPONENT = 12; | ||||
| @ -0,0 +1,3 @@ | ||||
| export interface IUniqueConnectionReadModel { | ||||
|     getStats(): Promise<{ previous: number; current: number }>; | ||||
| } | ||||
| @ -0,0 +1,31 @@ | ||||
| import type { | ||||
|     IUniqueConnectionReadModel, | ||||
|     IUniqueConnectionStore, | ||||
| } from '../../types'; | ||||
| 
 | ||||
| import HyperLogLog from 'hyperloglog-lite'; | ||||
| import { REGISTERS_EXPONENT } from './hyperloglog-config'; | ||||
| 
 | ||||
| export class UniqueConnectionReadModel implements IUniqueConnectionReadModel { | ||||
|     private uniqueConnectionStore: IUniqueConnectionStore; | ||||
| 
 | ||||
|     constructor(uniqueConnectionStore: IUniqueConnectionStore) { | ||||
|         this.uniqueConnectionStore = uniqueConnectionStore; | ||||
|     } | ||||
| 
 | ||||
|     async getStats() { | ||||
|         const [previous, current] = await Promise.all([ | ||||
|             this.uniqueConnectionStore.get('previous'), | ||||
|             this.uniqueConnectionStore.get('current'), | ||||
|         ]); | ||||
|         const previousHll = HyperLogLog(REGISTERS_EXPONENT); | ||||
|         if (previous) { | ||||
|             previousHll.merge({ n: REGISTERS_EXPONENT, buckets: previous.hll }); | ||||
|         } | ||||
|         const currentHll = HyperLogLog(REGISTERS_EXPONENT); | ||||
|         if (current) { | ||||
|             currentHll.merge({ n: REGISTERS_EXPONENT, buckets: current.hll }); | ||||
|         } | ||||
|         return { previous: previousHll.count(), current: currentHll.count() }; | ||||
|     } | ||||
| } | ||||
| @ -5,6 +5,7 @@ import type { IFlagResolver } from '../../types'; | ||||
| import { SDK_CONNECTION_ID_RECEIVED } from '../../metric-events'; | ||||
| import { addHours } from 'date-fns'; | ||||
| import EventEmitter from 'events'; | ||||
| import { UniqueConnectionReadModel } from './unique-connection-read-model'; | ||||
| 
 | ||||
| const alwaysOnFlagResolver = { | ||||
|     isEnabled() { | ||||
| @ -20,6 +21,9 @@ test('sync first current bucket', async () => { | ||||
|         { uniqueConnectionStore }, | ||||
|         config, | ||||
|     ); | ||||
|     const uniqueConnectionReadModel = new UniqueConnectionReadModel( | ||||
|         uniqueConnectionStore, | ||||
|     ); | ||||
|     uniqueConnectionService.listen(); | ||||
| 
 | ||||
|     eventBus.emit(SDK_CONNECTION_ID_RECEIVED, 'connection1'); | ||||
| @ -30,7 +34,7 @@ test('sync first current bucket', async () => { | ||||
| 
 | ||||
|     await uniqueConnectionService.sync(); | ||||
| 
 | ||||
|     const stats = await uniqueConnectionService.getStats(); | ||||
|     const stats = await uniqueConnectionReadModel.getStats(); | ||||
|     expect(stats).toEqual({ previous: 0, current: 2 }); | ||||
| }); | ||||
| 
 | ||||
| @ -43,6 +47,9 @@ test('sync first previous bucket', async () => { | ||||
|         config, | ||||
|     ); | ||||
|     uniqueConnectionService.listen(); | ||||
|     const uniqueConnectionReadModel = new UniqueConnectionReadModel( | ||||
|         uniqueConnectionStore, | ||||
|     ); | ||||
| 
 | ||||
|     eventBus.emit(SDK_CONNECTION_ID_RECEIVED, 'connection1'); | ||||
|     eventBus.emit(SDK_CONNECTION_ID_RECEIVED, 'connection2'); | ||||
| @ -53,7 +60,7 @@ test('sync first previous bucket', async () => { | ||||
| 
 | ||||
|     await uniqueConnectionService.sync(addHours(new Date(), 1)); | ||||
| 
 | ||||
|     const stats = await uniqueConnectionService.getStats(); | ||||
|     const stats = await uniqueConnectionReadModel.getStats(); | ||||
|     expect(stats).toEqual({ previous: 3, current: 0 }); | ||||
| }); | ||||
| 
 | ||||
| @ -66,6 +73,9 @@ test('sync to existing current bucket from the same service', async () => { | ||||
|         config, | ||||
|     ); | ||||
|     uniqueConnectionService.listen(); | ||||
|     const uniqueConnectionReadModel = new UniqueConnectionReadModel( | ||||
|         uniqueConnectionStore, | ||||
|     ); | ||||
| 
 | ||||
|     uniqueConnectionService.count('connection1'); | ||||
|     uniqueConnectionService.count('connection2'); | ||||
| @ -75,7 +85,7 @@ test('sync to existing current bucket from the same service', async () => { | ||||
|     uniqueConnectionService.count('connection1'); | ||||
|     uniqueConnectionService.count('connection3'); | ||||
| 
 | ||||
|     const stats = await uniqueConnectionService.getStats(); | ||||
|     const stats = await uniqueConnectionReadModel.getStats(); | ||||
|     expect(stats).toEqual({ previous: 0, current: 3 }); | ||||
| }); | ||||
| 
 | ||||
| @ -95,6 +105,9 @@ test('sync to existing current bucket from another service', async () => { | ||||
|         { uniqueConnectionStore }, | ||||
|         config, | ||||
|     ); | ||||
|     const uniqueConnectionReadModel = new UniqueConnectionReadModel( | ||||
|         uniqueConnectionStore, | ||||
|     ); | ||||
| 
 | ||||
|     uniqueConnectionService1.count('connection1'); | ||||
|     uniqueConnectionService1.count('connection2'); | ||||
| @ -104,10 +117,8 @@ test('sync to existing current bucket from another service', async () => { | ||||
|     uniqueConnectionService2.count('connection3'); | ||||
|     await uniqueConnectionService2.sync(); | ||||
| 
 | ||||
|     const stats1 = await uniqueConnectionService1.getStats(); | ||||
|     expect(stats1).toEqual({ previous: 0, current: 3 }); | ||||
|     const stats2 = await uniqueConnectionService2.getStats(); | ||||
|     expect(stats2).toEqual({ previous: 0, current: 3 }); | ||||
|     const stats = await uniqueConnectionReadModel.getStats(); | ||||
|     expect(stats).toEqual({ previous: 0, current: 3 }); | ||||
| }); | ||||
| 
 | ||||
| test('sync to existing previous bucket from another service', async () => { | ||||
| @ -118,6 +129,9 @@ test('sync to existing previous bucket from another service', async () => { | ||||
|         eventBus: eventBus, | ||||
|     }; | ||||
|     const uniqueConnectionStore = new FakeUniqueConnectionStore(); | ||||
|     const uniqueConnectionReadModel = new UniqueConnectionReadModel( | ||||
|         uniqueConnectionStore, | ||||
|     ); | ||||
|     const uniqueConnectionService1 = new UniqueConnectionService( | ||||
|         { uniqueConnectionStore }, | ||||
|         config, | ||||
| @ -135,10 +149,8 @@ test('sync to existing previous bucket from another service', async () => { | ||||
|     uniqueConnectionService2.count('connection3'); | ||||
|     await uniqueConnectionService2.sync(addHours(new Date(), 1)); | ||||
| 
 | ||||
|     const stats1 = await uniqueConnectionService1.getStats(); | ||||
|     expect(stats1).toEqual({ previous: 3, current: 0 }); | ||||
|     const stats2 = await uniqueConnectionService2.getStats(); | ||||
|     expect(stats2).toEqual({ previous: 3, current: 0 }); | ||||
|     const stats = await uniqueConnectionReadModel.getStats(); | ||||
|     expect(stats).toEqual({ previous: 3, current: 0 }); | ||||
| }); | ||||
| 
 | ||||
| test('populate previous and current', async () => { | ||||
| @ -149,6 +161,9 @@ test('populate previous and current', async () => { | ||||
|         { uniqueConnectionStore }, | ||||
|         config, | ||||
|     ); | ||||
|     const uniqueConnectionReadModel = new UniqueConnectionReadModel( | ||||
|         uniqueConnectionStore, | ||||
|     ); | ||||
| 
 | ||||
|     uniqueConnectionService.count('connection1'); | ||||
|     uniqueConnectionService.count('connection2'); | ||||
| @ -164,6 +179,6 @@ test('populate previous and current', async () => { | ||||
|     await uniqueConnectionService.sync(addHours(new Date(), 1)); | ||||
|     await uniqueConnectionService.sync(addHours(new Date(), 1)); // deliberate duplicate call
 | ||||
| 
 | ||||
|     const stats = await uniqueConnectionService.getStats(); | ||||
|     const stats = await uniqueConnectionReadModel.getStats(); | ||||
|     expect(stats).toEqual({ previous: 3, current: 2 }); | ||||
| }); | ||||
|  | ||||
| @ -5,9 +5,7 @@ import type { IUniqueConnectionStore } from './unique-connection-store-type'; | ||||
| import HyperLogLog from 'hyperloglog-lite'; | ||||
| import type EventEmitter from 'events'; | ||||
| import { SDK_CONNECTION_ID_RECEIVED } from '../../metric-events'; | ||||
| 
 | ||||
| // HyperLogLog will create 2^n registers
 | ||||
| const n = 12; | ||||
| import { REGISTERS_EXPONENT } from './hyperloglog-config'; | ||||
| 
 | ||||
| export class UniqueConnectionService { | ||||
|     private logger: Logger; | ||||
| @ -20,7 +18,7 @@ export class UniqueConnectionService { | ||||
| 
 | ||||
|     private activeHour: number; | ||||
| 
 | ||||
|     private hll = HyperLogLog(n); | ||||
|     private hll = HyperLogLog(REGISTERS_EXPONENT); | ||||
| 
 | ||||
|     constructor( | ||||
|         { | ||||
| @ -44,22 +42,6 @@ export class UniqueConnectionService { | ||||
|         this.hll.add(HyperLogLog.hash(connectionId)); | ||||
|     } | ||||
| 
 | ||||
|     async getStats() { | ||||
|         const [previous, current] = await Promise.all([ | ||||
|             this.uniqueConnectionStore.get('previous'), | ||||
|             this.uniqueConnectionStore.get('current'), | ||||
|         ]); | ||||
|         const previousHll = HyperLogLog(n); | ||||
|         if (previous) { | ||||
|             previousHll.merge({ n, buckets: previous.hll }); | ||||
|         } | ||||
|         const currentHll = HyperLogLog(n); | ||||
|         if (current) { | ||||
|             currentHll.merge({ n, buckets: current.hll }); | ||||
|         } | ||||
|         return { previous: previousHll.count(), current: currentHll.count() }; | ||||
|     } | ||||
| 
 | ||||
|     async sync(currentTime = new Date()): Promise<void> { | ||||
|         if (!this.flagResolver.isEnabled('uniqueSdkTracking')) return; | ||||
| 
 | ||||
| @ -68,7 +50,10 @@ export class UniqueConnectionService { | ||||
| 
 | ||||
|         if (this.activeHour !== currentHour && currentBucket) { | ||||
|             if (currentBucket.updatedAt.getHours() < currentHour) { | ||||
|                 this.hll.merge({ n, buckets: currentBucket.hll }); | ||||
|                 this.hll.merge({ | ||||
|                     n: REGISTERS_EXPONENT, | ||||
|                     buckets: currentBucket.hll, | ||||
|                 }); | ||||
|                 await this.uniqueConnectionStore.insert({ | ||||
|                     hll: this.hll.output().buckets, | ||||
|                     id: 'previous', | ||||
| @ -77,7 +62,10 @@ export class UniqueConnectionService { | ||||
|                 const previousBucket = | ||||
|                     await this.uniqueConnectionStore.get('previous'); | ||||
|                 if (previousBucket) { | ||||
|                     this.hll.merge({ n, buckets: previousBucket.hll }); | ||||
|                     this.hll.merge({ | ||||
|                         n: REGISTERS_EXPONENT, | ||||
|                         buckets: previousBucket.hll, | ||||
|                     }); | ||||
|                 } | ||||
|                 await this.uniqueConnectionStore.insert({ | ||||
|                     hll: this.hll.output().buckets, | ||||
| @ -86,9 +74,12 @@ export class UniqueConnectionService { | ||||
|             } | ||||
| 
 | ||||
|             this.activeHour = currentHour; | ||||
|             this.hll = HyperLogLog(n); | ||||
|             this.hll = HyperLogLog(REGISTERS_EXPONENT); | ||||
|         } else if (currentBucket) { | ||||
|             this.hll.merge({ n, buckets: currentBucket.hll }); | ||||
|             this.hll.merge({ | ||||
|                 n: REGISTERS_EXPONENT, | ||||
|                 buckets: currentBucket.hll, | ||||
|             }); | ||||
|         } | ||||
| 
 | ||||
|         await this.uniqueConnectionStore.insert({ | ||||
|  | ||||
| @ -269,6 +269,18 @@ export function registerPrometheusMetrics( | ||||
|         }, | ||||
|     }); | ||||
| 
 | ||||
|     dbMetrics.registerGaugeDbMetric({ | ||||
|         name: 'unique_sdk_connections_total', | ||||
|         help: 'The number of unique SDK connections for the full previous hour across all instances. Available only for SDKs reporting `unleash-x-connection-id`', | ||||
|         query: () => { | ||||
|             if (flagResolver.isEnabled('uniqueSdkTracking')) { | ||||
|                 return stores.uniqueConnectionReadModel.getStats(); | ||||
|             } | ||||
|             return Promise.resolve({ current: 0, previous: 0 }); | ||||
|         }, | ||||
|         map: (result) => ({ value: result.previous }), | ||||
|     }); | ||||
| 
 | ||||
|     const featureTogglesArchivedTotal = createGauge({ | ||||
|         name: 'feature_toggles_archived_total', | ||||
|         help: 'Number of archived feature flags', | ||||
|  | ||||
| @ -54,6 +54,7 @@ import { IOnboardingStore } from '../features/onboarding/onboarding-store-type'; | ||||
| import type { IUserUnsubscribeStore } from '../features/user-subscriptions/user-unsubscribe-store-type'; | ||||
| import type { IUserSubscriptionsReadModel } from '../features/user-subscriptions/user-subscriptions-read-model-type'; | ||||
| import { IUniqueConnectionStore } from '../features/unique-connection/unique-connection-store-type'; | ||||
| import { IUniqueConnectionReadModel } from '../features/unique-connection/unique-connection-read-model-type'; | ||||
| 
 | ||||
| export interface IUnleashStores { | ||||
|     accessStore: IAccessStore; | ||||
| @ -112,6 +113,7 @@ export interface IUnleashStores { | ||||
|     userUnsubscribeStore: IUserUnsubscribeStore; | ||||
|     userSubscriptionsReadModel: IUserSubscriptionsReadModel; | ||||
|     uniqueConnectionStore: IUniqueConnectionStore; | ||||
|     uniqueConnectionReadModel: IUniqueConnectionReadModel; | ||||
| } | ||||
| 
 | ||||
| export { | ||||
| @ -168,4 +170,5 @@ export { | ||||
|     IOnboardingStore, | ||||
|     type IUserSubscriptionsReadModel, | ||||
|     IUniqueConnectionStore, | ||||
|     IUniqueConnectionReadModel, | ||||
| }; | ||||
|  | ||||
							
								
								
									
										8
									
								
								src/test/fixtures/store.ts
									
									
									
									
										vendored
									
									
								
							
							
						
						
									
										8
									
								
								src/test/fixtures/store.ts
									
									
									
									
										vendored
									
									
								
							| @ -57,6 +57,7 @@ import { createFakeOnboardingReadModel } from '../../lib/features/onboarding/cre | ||||
| import { FakeUserUnsubscribeStore } from '../../lib/features/user-subscriptions/fake-user-unsubscribe-store'; | ||||
| import { FakeUserSubscriptionsReadModel } from '../../lib/features/user-subscriptions/fake-user-subscriptions-read-model'; | ||||
| import { FakeUniqueConnectionStore } from '../../lib/features/unique-connection/fake-unique-connection-store'; | ||||
| import { UniqueConnectionReadModel } from '../../lib/features/unique-connection/unique-connection-read-model'; | ||||
| 
 | ||||
| const db = { | ||||
|     select: () => ({ | ||||
| @ -65,6 +66,8 @@ const db = { | ||||
| }; | ||||
| 
 | ||||
| const createStores: () => IUnleashStores = () => { | ||||
|     const uniqueConnectionStore = new FakeUniqueConnectionStore(); | ||||
| 
 | ||||
|     return { | ||||
|         db, | ||||
|         clientApplicationsStore: new FakeClientApplicationsStore(), | ||||
| @ -122,7 +125,10 @@ const createStores: () => IUnleashStores = () => { | ||||
|         onboardingStore: new FakeOnboardingStore(), | ||||
|         userUnsubscribeStore: new FakeUserUnsubscribeStore(), | ||||
|         userSubscriptionsReadModel: new FakeUserSubscriptionsReadModel(), | ||||
|         uniqueConnectionStore: new FakeUniqueConnectionStore(), | ||||
|         uniqueConnectionStore, | ||||
|         uniqueConnectionReadModel: new UniqueConnectionReadModel( | ||||
|             uniqueConnectionStore, | ||||
|         ), | ||||
|     }; | ||||
| }; | ||||
| 
 | ||||
|  | ||||
		Loading…
	
		Reference in New Issue
	
	Block a user