1
0
mirror of https://github.com/Unleash/unleash.git synced 2025-07-26 13:48:33 +02:00

feat: unique connection counting (#9074)

This commit is contained in:
Mateusz Kwasniewski 2025-01-13 11:56:57 +01:00 committed by GitHub
parent af1b6c8c37
commit e559718581
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
18 changed files with 456 additions and 1 deletions

View File

@ -134,6 +134,7 @@
"hash-sum": "^2.0.0",
"helmet": "^6.0.0",
"http-errors": "^2.0.0",
"hyperloglog-lite": "^1.0.2",
"ip-address": "^10.0.1",
"joi": "^17.13.3",
"js-sha256": "^0.11.0",

View File

@ -56,6 +56,7 @@ import { OnboardingStore } from '../features/onboarding/onboarding-store';
import { createOnboardingReadModel } from '../features/onboarding/createOnboardingReadModel';
import { UserUnsubscribeStore } from '../features/user-subscriptions/user-unsubscribe-store';
import { UserSubscriptionsReadModel } from '../features/user-subscriptions/user-subscriptions-read-model';
import { UniqueConnectionStore } from '../features/unique-connection/unique-connection-store';
export const createStores = (
config: IUnleashConfig,
@ -185,6 +186,7 @@ export const createStores = (
),
userUnsubscribeStore: new UserUnsubscribeStore(db),
userSubscriptionsReadModel: new UserSubscriptionsReadModel(db),
uniqueConnectionStore: new UniqueConnectionStore(db),
};
};

View File

@ -32,6 +32,7 @@ export const scheduleServices = async (
frontendApiService,
clientMetricsServiceV2,
integrationEventsService,
uniqueConnectionService,
} = services;
schedulerService.schedule(
@ -179,4 +180,10 @@ export const scheduleServices = async (
minutesToMilliseconds(15),
'cleanUpIntegrationEvents',
);
schedulerService.schedule(
uniqueConnectionService.sync.bind(uniqueConnectionService),
minutesToMilliseconds(10),
'uniqueConnectionService',
);
};

View File

@ -0,0 +1,27 @@
import type { IUniqueConnectionStore } from '../../types';
import type {
TimedUniqueConnections,
UniqueConnections,
} from './unique-connection-store-type';
export class FakeUniqueConnectionStore implements IUniqueConnectionStore {
private uniqueConnectionsRecord: Record<string, TimedUniqueConnections> =
{};
async insert(uniqueConnections: UniqueConnections): Promise<void> {
this.uniqueConnectionsRecord[uniqueConnections.id] = {
...uniqueConnections,
updatedAt: new Date(),
};
}
async get(
id: 'current' | 'previous',
): Promise<(UniqueConnections & { updatedAt: Date }) | null> {
return this.uniqueConnectionsRecord[id] || null;
}
async deleteAll(): Promise<void> {
this.uniqueConnectionsRecord = {};
}
}

View File

@ -0,0 +1,169 @@
import { UniqueConnectionService } from './unique-connection-service';
import { FakeUniqueConnectionStore } from './fake-unique-connection-store';
import getLogger from '../../../test/fixtures/no-logger';
import type { IFlagResolver } from '../../types';
import { SDK_CONNECTION_ID_RECEIVED } from '../../metric-events';
import { addHours } from 'date-fns';
import EventEmitter from 'events';
const alwaysOnFlagResolver = {
isEnabled() {
return true;
},
} as unknown as IFlagResolver;
test('sync first current bucket', async () => {
const eventBus = new EventEmitter();
const config = { flagResolver: alwaysOnFlagResolver, getLogger, eventBus };
const uniqueConnectionStore = new FakeUniqueConnectionStore();
const uniqueConnectionService = new UniqueConnectionService(
{ uniqueConnectionStore },
config,
);
uniqueConnectionService.listen();
eventBus.emit(SDK_CONNECTION_ID_RECEIVED, 'connection1');
eventBus.emit(SDK_CONNECTION_ID_RECEIVED, 'connection1');
eventBus.emit(SDK_CONNECTION_ID_RECEIVED, 'connection2');
eventBus.emit(SDK_CONNECTION_ID_RECEIVED, 'connection2');
eventBus.emit(SDK_CONNECTION_ID_RECEIVED, 'connection2');
await uniqueConnectionService.sync();
const stats = await uniqueConnectionService.getStats();
expect(stats).toEqual({ previous: 0, current: 2 });
});
test('sync first previous bucket', async () => {
const eventBus = new EventEmitter();
const config = { flagResolver: alwaysOnFlagResolver, getLogger, eventBus };
const uniqueConnectionStore = new FakeUniqueConnectionStore();
const uniqueConnectionService = new UniqueConnectionService(
{ uniqueConnectionStore },
config,
);
uniqueConnectionService.listen();
eventBus.emit(SDK_CONNECTION_ID_RECEIVED, 'connection1');
eventBus.emit(SDK_CONNECTION_ID_RECEIVED, 'connection2');
await uniqueConnectionService.sync();
eventBus.emit(SDK_CONNECTION_ID_RECEIVED, 'connection3');
await uniqueConnectionService.sync(addHours(new Date(), 1));
const stats = await uniqueConnectionService.getStats();
expect(stats).toEqual({ previous: 3, current: 0 });
});
test('sync to existing current bucket from the same service', async () => {
const eventBus = new EventEmitter();
const config = { flagResolver: alwaysOnFlagResolver, getLogger, eventBus };
const uniqueConnectionStore = new FakeUniqueConnectionStore();
const uniqueConnectionService = new UniqueConnectionService(
{ uniqueConnectionStore },
config,
);
uniqueConnectionService.listen();
uniqueConnectionService.count('connection1');
uniqueConnectionService.count('connection2');
await uniqueConnectionService.sync();
uniqueConnectionService.count('connection1');
uniqueConnectionService.count('connection3');
const stats = await uniqueConnectionService.getStats();
expect(stats).toEqual({ previous: 0, current: 3 });
});
test('sync to existing current bucket from another service', async () => {
const eventBus = new EventEmitter();
const config = {
flagResolver: alwaysOnFlagResolver,
getLogger,
eventBus: eventBus,
};
const uniqueConnectionStore = new FakeUniqueConnectionStore();
const uniqueConnectionService1 = new UniqueConnectionService(
{ uniqueConnectionStore },
config,
);
const uniqueConnectionService2 = new UniqueConnectionService(
{ uniqueConnectionStore },
config,
);
uniqueConnectionService1.count('connection1');
uniqueConnectionService1.count('connection2');
await uniqueConnectionService1.sync();
uniqueConnectionService2.count('connection1');
uniqueConnectionService2.count('connection3');
await uniqueConnectionService2.sync();
const stats1 = await uniqueConnectionService1.getStats();
expect(stats1).toEqual({ previous: 0, current: 3 });
const stats2 = await uniqueConnectionService2.getStats();
expect(stats2).toEqual({ previous: 0, current: 3 });
});
test('sync to existing previous bucket from another service', async () => {
const eventBus = new EventEmitter();
const config = {
flagResolver: alwaysOnFlagResolver,
getLogger,
eventBus: eventBus,
};
const uniqueConnectionStore = new FakeUniqueConnectionStore();
const uniqueConnectionService1 = new UniqueConnectionService(
{ uniqueConnectionStore },
config,
);
const uniqueConnectionService2 = new UniqueConnectionService(
{ uniqueConnectionStore },
config,
);
uniqueConnectionService1.count('connection1');
uniqueConnectionService1.count('connection2');
await uniqueConnectionService1.sync(addHours(new Date(), 1));
uniqueConnectionService2.count('connection1');
uniqueConnectionService2.count('connection3');
await uniqueConnectionService2.sync(addHours(new Date(), 1));
const stats1 = await uniqueConnectionService1.getStats();
expect(stats1).toEqual({ previous: 3, current: 0 });
const stats2 = await uniqueConnectionService2.getStats();
expect(stats2).toEqual({ previous: 3, current: 0 });
});
test('populate previous and current', async () => {
const eventBus = new EventEmitter();
const config = { flagResolver: alwaysOnFlagResolver, getLogger, eventBus };
const uniqueConnectionStore = new FakeUniqueConnectionStore();
const uniqueConnectionService = new UniqueConnectionService(
{ uniqueConnectionStore },
config,
);
uniqueConnectionService.count('connection1');
uniqueConnectionService.count('connection2');
await uniqueConnectionService.sync();
await uniqueConnectionService.sync();
uniqueConnectionService.count('connection3');
await uniqueConnectionService.sync(addHours(new Date(), 1));
await uniqueConnectionService.sync(addHours(new Date(), 1)); // deliberate duplicate call
uniqueConnectionService.count('connection3');
uniqueConnectionService.count('connection4');
await uniqueConnectionService.sync(addHours(new Date(), 1));
await uniqueConnectionService.sync(addHours(new Date(), 1)); // deliberate duplicate call
const stats = await uniqueConnectionService.getStats();
expect(stats).toEqual({ previous: 3, current: 2 });
});

View File

@ -0,0 +1,99 @@
import type { IUnleashConfig } from '../../types/option';
import type { IFlagResolver, IUnleashStores } from '../../types';
import type { Logger } from '../../logger';
import type { IUniqueConnectionStore } from './unique-connection-store-type';
import HyperLogLog from 'hyperloglog-lite';
import type EventEmitter from 'events';
import { SDK_CONNECTION_ID_RECEIVED } from '../../metric-events';
// HyperLogLog will create 2^n registers
const n = 12;
export class UniqueConnectionService {
private logger: Logger;
private uniqueConnectionStore: IUniqueConnectionStore;
private flagResolver: IFlagResolver;
private eventBus: EventEmitter;
private activeHour: number;
private hll = HyperLogLog(n);
constructor(
{
uniqueConnectionStore,
}: Pick<IUnleashStores, 'uniqueConnectionStore'>,
config: Pick<IUnleashConfig, 'getLogger' | 'flagResolver' | 'eventBus'>,
) {
this.uniqueConnectionStore = uniqueConnectionStore;
this.logger = config.getLogger('services/unique-connection-service.ts');
this.flagResolver = config.flagResolver;
this.eventBus = config.eventBus;
this.activeHour = new Date().getHours();
}
listen() {
this.eventBus.on(SDK_CONNECTION_ID_RECEIVED, this.count.bind(this));
}
count(connectionId: string) {
if (!this.flagResolver.isEnabled('uniqueSdkTracking')) return;
this.hll.add(HyperLogLog.hash(connectionId));
}
async getStats() {
const [previous, current] = await Promise.all([
this.uniqueConnectionStore.get('previous'),
this.uniqueConnectionStore.get('current'),
]);
const previousHll = HyperLogLog(n);
if (previous) {
previousHll.merge({ n, buckets: previous.hll });
}
const currentHll = HyperLogLog(n);
if (current) {
currentHll.merge({ n, buckets: current.hll });
}
return { previous: previousHll.count(), current: currentHll.count() };
}
async sync(currentTime = new Date()): Promise<void> {
if (!this.flagResolver.isEnabled('uniqueSdkTracking')) return;
const currentHour = currentTime.getHours();
const currentBucket = await this.uniqueConnectionStore.get('current');
if (this.activeHour !== currentHour && currentBucket) {
if (currentBucket.updatedAt.getHours() < currentHour) {
this.hll.merge({ n, buckets: currentBucket.hll });
await this.uniqueConnectionStore.insert({
hll: this.hll.output().buckets,
id: 'previous',
});
} else {
const previousBucket =
await this.uniqueConnectionStore.get('previous');
if (previousBucket) {
this.hll.merge({ n, buckets: previousBucket.hll });
}
await this.uniqueConnectionStore.insert({
hll: this.hll.output().buckets,
id: 'previous',
});
}
this.activeHour = currentHour;
this.hll = HyperLogLog(n);
} else if (currentBucket) {
this.hll.merge({ n, buckets: currentBucket.hll });
}
await this.uniqueConnectionStore.insert({
hll: this.hll.output().buckets,
id: 'current',
});
}
}

View File

@ -0,0 +1,14 @@
export type UniqueConnections = {
hll: Buffer;
id: 'current' | 'previous';
};
export type TimedUniqueConnections = UniqueConnections & {
updatedAt: Date;
};
export interface IUniqueConnectionStore {
insert(uniqueConnections: UniqueConnections): Promise<void>;
get(id: 'current' | 'previous'): Promise<TimedUniqueConnections | null>;
deleteAll(): Promise<void>;
}

View File

@ -0,0 +1,58 @@
import dbInit, { type ITestDb } from '../../../test/e2e/helpers/database-init';
import getLogger from '../../../test/fixtures/no-logger';
import type {
IUniqueConnectionStore,
IUnleashStores,
} from '../../../lib/types';
import HyperLogLog from 'hyperloglog-lite';
let stores: IUnleashStores;
let db: ITestDb;
let uniqueConnectionStore: IUniqueConnectionStore;
beforeAll(async () => {
db = await dbInit('unique_connections_store', getLogger);
stores = db.stores;
uniqueConnectionStore = stores.uniqueConnectionStore;
});
afterAll(async () => {
await db.destroy();
});
beforeEach(async () => {
await uniqueConnectionStore.deleteAll();
});
test('should store empty HyperLogLog buffer', async () => {
const hll = HyperLogLog(12);
await uniqueConnectionStore.insert({
id: 'current',
hll: hll.output().buckets,
});
const fetchedHll = await uniqueConnectionStore.get('current');
hll.merge({ n: 12, buckets: fetchedHll!.hll });
expect(hll.count()).toBe(0);
});
test('should store non empty HyperLogLog buffer', async () => {
const hll = HyperLogLog(12);
hll.add(HyperLogLog.hash('connection-1'));
hll.add(HyperLogLog.hash('connection-2'));
await uniqueConnectionStore.insert({
id: 'current',
hll: hll.output().buckets,
});
const fetchedHll = await uniqueConnectionStore.get('current');
const emptyHll = HyperLogLog(12);
emptyHll.merge({ n: 12, buckets: fetchedHll!.hll });
expect(hll.count()).toBe(2);
});
test('should indicate when no entry', async () => {
const fetchedHll = await uniqueConnectionStore.get('current');
expect(fetchedHll).toBeNull();
});

View File

@ -0,0 +1,34 @@
import type { Db } from '../../db/db';
import type { IUniqueConnectionStore } from '../../types';
import type { UniqueConnections } from './unique-connection-store-type';
export class UniqueConnectionStore implements IUniqueConnectionStore {
private db: Db;
constructor(db: Db) {
this.db = db;
}
async insert(uniqueConnections: UniqueConnections): Promise<void> {
await this.db<UniqueConnections>('unique_connections')
.insert({ id: uniqueConnections.id, hll: uniqueConnections.hll })
.onConflict('id')
.merge();
}
async get(
id: 'current' | 'previous',
): Promise<(UniqueConnections & { updatedAt: Date }) | null> {
const row = await this.db('unique_connections')
.select('id', 'hll', 'updated_at')
.where('id', id)
.first();
return row
? { id: row.id, hll: row.hll, updatedAt: row.updated_at }
: null;
}
async deleteAll(): Promise<void> {
await this.db('unique_connections').delete();
}
}

View File

@ -1,6 +1,7 @@
import type EventEmitter from 'events';
const REQUEST_TIME = 'request_time';
const SDK_CONNECTION_ID_RECEIVED = 'sdk_connection_id_received';
const DB_TIME = 'db_time';
const FUNCTION_TIME = 'function_time';
const SCHEDULER_JOB_TIME = 'scheduler_job_time';
@ -21,6 +22,7 @@ const CLIENT_DELTA_MEMORY = 'client_delta_memory';
type MetricEvent =
| typeof REQUEST_TIME
| typeof SDK_CONNECTION_ID_RECEIVED
| typeof DB_TIME
| typeof FUNCTION_TIME
| typeof SCHEDULER_JOB_TIME
@ -71,6 +73,7 @@ const onMetricEvent = <T extends MetricEvent>(
export {
REQUEST_TIME,
SDK_CONNECTION_ID_RECEIVED,
DB_TIME,
SCHEDULER_JOB_TIME,
FUNCTION_TIME,

View File

@ -66,6 +66,7 @@ describe('responseTimeMetrics new behavior', () => {
},
method: 'GET',
path: 'should-not-be-used',
headers: {},
};
// @ts-expect-error req and res doesn't have all properties
@ -98,6 +99,7 @@ describe('responseTimeMetrics new behavior', () => {
};
const reqWithoutRoute = {
method: 'GET',
headers: {},
};
// @ts-expect-error req and res doesn't have all properties
@ -132,6 +134,7 @@ describe('responseTimeMetrics new behavior', () => {
};
const reqWithoutRoute = {
method: 'GET',
headers: {},
};
// @ts-expect-error req and res doesn't have all properties
@ -166,6 +169,7 @@ describe('responseTimeMetrics new behavior', () => {
const reqWithoutRoute = {
method: 'GET',
path,
headers: {},
};
// @ts-expect-error req and res doesn't have all properties
@ -210,6 +214,7 @@ describe('responseTimeMetrics new behavior', () => {
const reqWithoutRoute = {
method: 'GET',
path,
headers: {},
};
// @ts-expect-error req and res doesn't have all properties

View File

@ -1,6 +1,6 @@
import * as responseTime from 'response-time';
import type EventEmitter from 'events';
import { REQUEST_TIME } from '../metric-events';
import { REQUEST_TIME, SDK_CONNECTION_ID_RECEIVED } from '../metric-events';
import type { IFlagResolver } from '../types/experimental';
import type { InstanceStatsService } from '../services';
import type { RequestHandler } from 'express';
@ -66,6 +66,11 @@ export function responseTimeMetrics(
req.query.appName;
}
const connectionId = req.headers['x-unleash-connection-id'];
if (connectionId && flagResolver.isEnabled('uniqueSdkTracking')) {
eventBus.emit(SDK_CONNECTION_ID_RECEIVED, connectionId);
}
const timingInfo = {
path: pathname,
method: req.method,

View File

@ -157,6 +157,7 @@ import {
createContextService,
createFakeContextService,
} from '../features/context/createContextService';
import { UniqueConnectionService } from '../features/unique-connection/unique-connection-service';
export const createServices = (
stores: IUnleashStores,
@ -403,6 +404,9 @@ export const createServices = (
const featureLifecycleService = transactionalFeatureLifecycleService;
featureLifecycleService.listen();
const uniqueConnectionService = new UniqueConnectionService(stores, config);
uniqueConnectionService.listen();
const onboardingService = db
? createOnboardingService(config)(db)
: createFakeOnboardingService(config).onboardingService;
@ -484,6 +488,7 @@ export const createServices = (
personalDashboardService,
projectStatusService,
transactionalUserSubscriptionsService,
uniqueConnectionService,
};
};
@ -537,4 +542,5 @@ export {
PersonalDashboardService,
ProjectStatusService,
UserSubscriptionsService,
UniqueConnectionService,
};

View File

@ -59,6 +59,7 @@ import type { OnboardingService } from '../features/onboarding/onboarding-servic
import type { PersonalDashboardService } from '../features/personal-dashboard/personal-dashboard-service';
import type { ProjectStatusService } from '../features/project-status/project-status-service';
import type { UserSubscriptionsService } from '../features/user-subscriptions/user-subscriptions-service';
import type { UniqueConnectionService } from '../features/unique-connection/unique-connection-service';
export interface IUnleashServices {
transactionalAccessService: WithTransactional<AccessService>;
@ -131,4 +132,5 @@ export interface IUnleashServices {
personalDashboardService: PersonalDashboardService;
projectStatusService: ProjectStatusService;
transactionalUserSubscriptionsService: WithTransactional<UserSubscriptionsService>;
uniqueConnectionService: UniqueConnectionService;
}

View File

@ -53,6 +53,7 @@ import { IOnboardingReadModel } from '../features/onboarding/onboarding-read-mod
import { IOnboardingStore } from '../features/onboarding/onboarding-store-type';
import type { IUserUnsubscribeStore } from '../features/user-subscriptions/user-unsubscribe-store-type';
import type { IUserSubscriptionsReadModel } from '../features/user-subscriptions/user-subscriptions-read-model-type';
import { IUniqueConnectionStore } from '../features/unique-connection/unique-connection-store-type';
export interface IUnleashStores {
accessStore: IAccessStore;
@ -110,6 +111,7 @@ export interface IUnleashStores {
onboardingStore: IOnboardingStore;
userUnsubscribeStore: IUserUnsubscribeStore;
userSubscriptionsReadModel: IUserSubscriptionsReadModel;
uniqueConnectionStore: IUniqueConnectionStore;
}
export {
@ -165,4 +167,5 @@ export {
type IProjectReadModel,
IOnboardingStore,
type IUserSubscriptionsReadModel,
IUniqueConnectionStore,
};

View File

@ -55,6 +55,7 @@ process.nextTick(async () => {
flagOverviewRedesign: false,
granularAdminPermissions: true,
deltaApi: true,
uniqueSdkTracking: true,
},
},
authentication: {

View File

@ -56,6 +56,7 @@ import { FakeOnboardingStore } from '../../lib/features/onboarding/fake-onboardi
import { createFakeOnboardingReadModel } from '../../lib/features/onboarding/createOnboardingReadModel';
import { FakeUserUnsubscribeStore } from '../../lib/features/user-subscriptions/fake-user-unsubscribe-store';
import { FakeUserSubscriptionsReadModel } from '../../lib/features/user-subscriptions/fake-user-subscriptions-read-model';
import { FakeUniqueConnectionStore } from '../../lib/features/unique-connection/fake-unique-connection-store';
const db = {
select: () => ({
@ -121,6 +122,7 @@ const createStores: () => IUnleashStores = () => {
onboardingStore: new FakeOnboardingStore(),
userUnsubscribeStore: new FakeUserUnsubscribeStore(),
userSubscriptionsReadModel: new FakeUserSubscriptionsReadModel(),
uniqueConnectionStore: new FakeUniqueConnectionStore(),
};
};

View File

@ -4919,6 +4919,15 @@ __metadata:
languageName: node
linkType: hard
"hyperloglog-lite@npm:^1.0.2":
version: 1.0.2
resolution: "hyperloglog-lite@npm:1.0.2"
dependencies:
murmurhash32-node: "npm:^1.0.1"
checksum: 10c0/3077b9dba1bac384b842a70d1b17da58449d3e633936ef7bd03a3386613e59c413f5f886d9383d14c3fe31eac524abe28a99025d43c446e57aa4175b17675450
languageName: node
linkType: hard
"iconv-lite@npm:0.4.24":
version: 0.4.24
resolution: "iconv-lite@npm:0.4.24"
@ -6831,6 +6840,13 @@ __metadata:
languageName: node
linkType: hard
"murmurhash32-node@npm:^1.0.1":
version: 1.0.1
resolution: "murmurhash32-node@npm:1.0.1"
checksum: 10c0/06a36a2f0d0c6855ce131c2a5c225c3096f53bf36898eb2683b2200f782577cde07f07485792d6e85798ea74f4dd95e836058fbab07c49cfcbc0a79b168ab654
languageName: node
linkType: hard
"murmurhash3js@npm:^3.0.1":
version: 3.0.1
resolution: "murmurhash3js@npm:3.0.1"
@ -9311,6 +9327,7 @@ __metadata:
helmet: "npm:^6.0.0"
http-errors: "npm:^2.0.0"
husky: "npm:^9.0.11"
hyperloglog-lite: "npm:^1.0.2"
ip-address: "npm:^10.0.1"
jest: "npm:29.7.0"
jest-junit: "npm:^16.0.0"