1
0
mirror of https://github.com/Unleash/unleash.git synced 2025-01-25 00:07:47 +01:00

feat: segment implementation in delta

This commit is contained in:
sjaanus 2025-01-24 08:30:13 +02:00
parent 280710f22a
commit 8642acb091
No known key found for this signature in database
GPG Key ID: 20E007C0248BA7FF
12 changed files with 259 additions and 93 deletions

View File

@ -41,7 +41,6 @@ import {
} from '../../internals';
import isEqual from 'lodash.isequal';
import { diff } from 'json-diff';
import type { DeltaHydrationEvent } from './delta/client-feature-toggle-delta-types';
const version = 2;
@ -192,9 +191,11 @@ export default class FeatureController extends Controller {
const sortedToggles = features.sort((a, b) =>
a.name.localeCompare(b.name),
);
const sortedSegments = segments.sort(
(a, b) => Number(a.id) - Number(b.id),
);
if (delta?.events[0].type === 'hydration') {
const hydrationEvent: DeltaHydrationEvent =
delta?.events[0];
const hydrationEvent = delta?.events[0];
const sortedNewToggles = hydrationEvent.features.sort(
(a, b) => a.name.localeCompare(b.name),
);
@ -214,6 +215,24 @@ export default class FeatureController extends Controller {
)}`,
);
}
const sortedNewSegments = hydrationEvent.segments.sort(
(a, b) => Number(a.id) - Number(b.id),
);
if (
!this.deepEqualIgnoreOrder(
sortedToggles,
sortedNewToggles,
)
) {
this.logger.warn(
`old features and new features are different for segments. Old count ${
segments.length
}, new count ${hydrationEvent.segments.length}, query ${JSON.stringify(query)},
diff ${JSON.stringify(
diff(sortedSegments, sortedNewSegments),
)}`,
);
}
} else {
this.logger.warn(
`Delta diff should have only hydration event, query ${JSON.stringify(query)}`,

View File

@ -7,6 +7,7 @@ import {
} from '../../../../test/e2e/helpers/test-helper';
import getLogger from '../../../../test/fixtures/no-logger';
import { DEFAULT_ENV } from '../../../util/constants';
import { DELTA_EVENT_TYPES } from './client-feature-toggle-delta-types';
let app: IUnleashTest;
let db: ITestDb;
@ -121,7 +122,7 @@ test('should return correct delta after feature created', async () => {
expect(body).toMatchObject({
events: [
{
type: 'hydration',
type: DELTA_EVENT_TYPES.HYDRATION,
features: [
{
name: 'base_feature',
@ -134,8 +135,6 @@ test('should return correct delta after feature created', async () => {
await app.createFeature('new_feature');
await syncRevisions();
//@ts-ignore
await app.services.clientFeatureToggleService.clientFeatureToggleDelta.onUpdateRevisionEvent();
const { body: deltaBody } = await app.request
.get('/api/client/delta')
@ -145,13 +144,7 @@ test('should return correct delta after feature created', async () => {
expect(deltaBody).toMatchObject({
events: [
{
type: 'feature-updated',
feature: {
name: 'new_feature',
},
},
{
type: 'feature-updated',
type: DELTA_EVENT_TYPES.FEATURE_UPDATED,
feature: {
name: 'new_feature',
},
@ -161,7 +154,9 @@ test('should return correct delta after feature created', async () => {
});
const syncRevisions = async () => {
await app.services.configurationRevisionService.updateMaxRevisionId();
await app.services.configurationRevisionService.updateMaxRevisionId(false);
//@ts-ignore
await app.services.clientFeatureToggleService.clientFeatureToggleDelta.onUpdateRevisionEvent();
};
test('archived features should not be returned as updated', async () => {
@ -187,7 +182,6 @@ test('archived features should not be returned as updated', async () => {
await app.archiveFeature('base_feature');
await syncRevisions();
await app.createFeature('new_feature');
await syncRevisions();
await app.getProjectFeatures('new_feature'); // TODO: this is silly, but events syncing and tests do not work nicely. this is basically a setTimeout
@ -199,11 +193,11 @@ test('archived features should not be returned as updated', async () => {
expect(deltaBody).toMatchObject({
events: [
{
type: 'feature-removed',
type: DELTA_EVENT_TYPES.FEATURE_REMOVED,
featureName: 'base_feature',
},
{
type: 'feature-updated',
type: DELTA_EVENT_TYPES.FEATURE_UPDATED,
feature: {
name: 'new_feature',
},
@ -211,3 +205,67 @@ test('archived features should not be returned as updated', async () => {
],
});
});
test('should get segment updated and removed events', async () => {
await app.createFeature('base_feature');
await syncRevisions();
const { body, headers } = await app.request
.get('/api/client/delta')
.expect(200);
const etag = headers.etag;
expect(body).toMatchObject({
events: [
{
features: [
{
name: 'base_feature',
},
],
},
],
});
const { body: segmentBody } = await app.createSegment({
name: 'my_segment_a',
constraints: [],
});
// we need this, because revision service does not fire event for segment creation
await app.createFeature('not_important1');
await syncRevisions();
await app.updateSegment(segmentBody.id, {
name: 'a',
constraints: [],
});
await syncRevisions();
await app.deleteSegment(segmentBody.id);
// we need this, because revision service does not fire event for segment deletion
await app.createFeature('not_important2');
await syncRevisions();
const { body: deltaBody } = await app.request
.get('/api/client/delta')
.set('If-None-Match', etag)
.expect(200);
expect(deltaBody).toMatchObject({
events: [
{
type: DELTA_EVENT_TYPES.FEATURE_UPDATED,
},
{
type: DELTA_EVENT_TYPES.SEGMENT_UPDATED,
},
{
type: DELTA_EVENT_TYPES.SEGMENT_UPDATED,
},
{
type: DELTA_EVENT_TYPES.FEATURE_UPDATED,
},
{
type: DELTA_EVENT_TYPES.SEGMENT_REMOVED,
},
],
});
});

View File

@ -5,6 +5,7 @@ export type DeltaHydrationEvent = {
eventId: number;
type: 'hydration';
features: ClientFeatureSchema[];
segments: IClientSegment[];
};
export type DeltaEvent =
@ -50,14 +51,11 @@ export const isDeltaFeatureRemovedEvent = (
return event.type === DELTA_EVENT_TYPES.FEATURE_REMOVED;
};
export const isDeltaSegmentUpdatedEvent = (
export const isDeltaSegmentEvent = (
event: DeltaEvent,
): event is Extract<DeltaEvent, { type: 'segment-updated' }> => {
return event.type === DELTA_EVENT_TYPES.SEGMENT_UPDATED;
};
export const isDeltaSegmentRemovedEvent = (
event: DeltaEvent,
): event is Extract<DeltaEvent, { type: 'segment-removed' }> => {
return event.type === DELTA_EVENT_TYPES.SEGMENT_REMOVED;
return (
event.type === DELTA_EVENT_TYPES.SEGMENT_UPDATED ||
event.type === DELTA_EVENT_TYPES.SEGMENT_REMOVED
);
};

View File

@ -85,6 +85,7 @@ test('project filter removes features not in project in hydration', () => {
const revisionList: DeltaHydrationEvent = {
eventId: 1,
type: 'hydration',
segments: [],
features: [
mockAdd({ name: 'feature1', project: 'project1' }),
mockAdd({ name: 'feature2', project: 'project2' }),
@ -101,6 +102,7 @@ test('project filter removes features not in project in hydration', () => {
expect(revisions).toEqual({
eventId: 1,
type: 'hydration',
segments: [],
features: [mockAdd({ name: 'myfeature2', project: 'project2' })],
});
});

View File

@ -1,5 +1,4 @@
import type {
IClientSegment,
IEventStore,
IFeatureToggleDeltaQuery,
IFeatureToggleQuery,
@ -24,6 +23,7 @@ import {
type DeltaHydrationEvent,
isDeltaFeatureRemovedEvent,
isDeltaFeatureUpdatedEvent,
isDeltaSegmentEvent,
} from './client-feature-toggle-delta-types';
type EnvironmentRevisions = Record<string, DeltaCache>;
@ -58,7 +58,9 @@ export const filterEventsByQuery = (
return targetedEvents.filter((revision) => {
return (
startsWithPrefix(revision) && (allProjects || isInProject(revision))
isDeltaSegmentEvent(revision) ||
(startsWithPrefix(revision) &&
(allProjects || isInProject(revision)))
);
});
};
@ -69,11 +71,12 @@ export const filterHydrationEventByQuery = (
namePrefix: string,
): DeltaHydrationEvent => {
const allProjects = projects.includes('*');
const { type, features, eventId } = event;
const { type, features, eventId, segments } = event;
return {
eventId,
type,
segments,
features: features.filter((feature) => {
return (
feature.name.startsWith(namePrefix) &&
@ -88,8 +91,6 @@ export class ClientFeatureToggleDelta {
private delta: EnvironmentRevisions = {};
private segments: IClientSegment[];
private eventStore: IEventStore;
private currentRevisionId: number = 0;
@ -124,7 +125,6 @@ export class ClientFeatureToggleDelta {
this.delta = {};
this.initRevisionId();
this.updateSegments();
this.configurationRevisionService.on(
UPDATE_REVISION,
this.onUpdateRevisionEvent,
@ -151,10 +151,6 @@ export class ClientFeatureToggleDelta {
if (!hasDelta) {
await this.initEnvironmentDelta(environment);
}
const hasSegments = this.segments;
if (!hasSegments) {
await this.updateSegments();
}
if (requiredRevisionId >= this.currentRevisionId) {
return undefined;
}
@ -167,12 +163,7 @@ export class ClientFeatureToggleDelta {
);
const response: ClientFeaturesDeltaSchema = {
events: [
{
...filteredEvent,
segments: this.segments,
},
],
events: [filteredEvent],
};
return Promise.resolve(response);
@ -202,7 +193,6 @@ export class ClientFeatureToggleDelta {
public async onUpdateRevisionEvent() {
if (this.flagResolver.isEnabled('deltaApi')) {
await this.updateFeaturesDelta();
await this.updateSegments();
this.storeFootprint();
}
}
@ -246,21 +236,32 @@ export class ClientFeatureToggleDelta {
project: event.project!,
}));
// TODO: implement single segment fetching
// const segmentsUpdated = changeEvents
// .filter((event) => event.type === 'segment-updated')
// .map((event) => ({
// name: event.featureName!,
// project: event.project!,
// }));
//
// const segmentsRemoved = changeEvents
// .filter((event) => event.type === 'segment-deleted')
// .map((event) => ({
// name: event.featureName!,
// project: event.project!,
// }));
//
const segmentsUpdated = changeEvents
.filter((event) =>
['segment-created', 'segment-updated'].includes(event.type),
)
.map((event) => event.data.id);
const segmentsRemoved = changeEvents
.filter((event) => event.type === 'segment-deleted')
.map((event) => event.preData.id);
const segments =
await this.segmentReadModel.getAllForClient(segmentsUpdated);
const segmentsUpdatedEvents: DeltaEvent[] = segments.map((segment) => ({
eventId: latestRevision,
type: DELTA_EVENT_TYPES.SEGMENT_UPDATED,
segment,
}));
const segmentsRemovedEvents: DeltaEvent[] = segmentsRemoved.map(
(segmentId) => ({
eventId: latestRevision,
type: DELTA_EVENT_TYPES.SEGMENT_REMOVED,
segmentId,
}),
);
// TODO: we might want to only update the environments that had events changed for performance
for (const environment of keys) {
@ -278,6 +279,8 @@ export class ClientFeatureToggleDelta {
this.delta[environment].addEvents([
...featuresUpdatedEvents,
...featuresRemovedEvents,
...segmentsUpdatedEvents,
...segmentsRemovedEvents,
]);
}
this.currentRevisionId = latestRevision;
@ -287,6 +290,9 @@ export class ClientFeatureToggleDelta {
environment: string,
toggles: string[],
): Promise<FeatureConfigurationDeltaClient[]> {
if (toggles.length === 0) {
return [];
}
return this.getClientFeatures({
toggleNames: toggles,
environment,
@ -298,6 +304,7 @@ export class ClientFeatureToggleDelta {
const baseFeatures = await this.getClientFeatures({
environment,
});
const baseSegments = await this.segmentReadModel.getActiveForClient();
this.currentRevisionId =
await this.configurationRevisionService.getMaxRevisionId();
@ -306,6 +313,7 @@ export class ClientFeatureToggleDelta {
eventId: this.currentRevisionId,
type: DELTA_EVENT_TYPES.HYDRATION,
features: baseFeatures,
segments: baseSegments,
});
this.storeFootprint();
@ -319,15 +327,9 @@ export class ClientFeatureToggleDelta {
return result;
}
private async updateSegments(): Promise<void> {
this.segments = await this.segmentReadModel.getActiveForClient();
}
storeFootprint() {
try {
const featuresMemory = this.getCacheSizeInBytes(this.delta);
const segmentsMemory = this.getCacheSizeInBytes(this.segments);
const memory = featuresMemory + segmentsMemory;
const memory = this.getCacheSizeInBytes(this.delta);
this.eventBus.emit(CLIENT_DELTA_MEMORY, { memory });
} catch (e) {
this.logger.error('Client delta footprint error', e);

View File

@ -55,6 +55,18 @@ describe('RevisionCache', () => {
},
],
type: 'hydration',
segments: [
{
id: 1,
name: 'update-segment',
constraints: [],
},
{
id: 2,
name: 'remove-segment',
constraints: [],
},
],
};
const initialEvents: DeltaEvent[] = [
{
@ -90,7 +102,7 @@ describe('RevisionCache', () => {
deltaCache.addEvents(initialEvents);
// Add a new revision to trigger changeBase
deltaCache.addEvents([
const addedEvents: DeltaEvent[] = [
{
eventId: 3,
type: 'feature-updated',
@ -122,12 +134,37 @@ describe('RevisionCache', () => {
featureName: 'test-flag',
project: 'default',
},
]);
{
eventId: 5,
type: 'segment-updated',
segment: {
id: 1,
name: 'update-segment-new',
constraints: [],
},
},
{
eventId: 6,
type: 'segment-removed',
segmentId: 2,
},
{
eventId: 7,
type: 'segment-updated',
segment: {
id: 3,
name: 'new-segment',
constraints: [],
},
},
];
deltaCache.addEvents(addedEvents);
const events = deltaCache.getEvents();
// Check that the base has been changed and merged correctly
expect(events.length).toBe(2);
expect(events.length).toBe(maxLength);
expect(events).toEqual(addedEvents.slice(-2));
const hydrationEvent = deltaCache.getHydrationEvent();
expect(hydrationEvent.features).toHaveLength(2);
@ -137,5 +174,11 @@ describe('RevisionCache', () => {
expect.objectContaining({ name: 'another-feature-flag' }),
]),
);
expect(hydrationEvent.segments).toEqual(
expect.arrayContaining([
expect.objectContaining({ name: 'update-segment-new', id: 1 }),
expect.objectContaining({ name: 'new-segment' }),
]),
);
});
});

View File

@ -4,14 +4,6 @@ import {
type DeltaHydrationEvent,
} from './client-feature-toggle-delta-types';
const mergeWithoutDuplicates = (arr1: any[], arr2: any[]) => {
const map = new Map();
arr1.concat(arr2).forEach((item) => {
map.set(item.name, item);
});
return Array.from(map.values());
};
export class DeltaCache {
private events: DeltaEvent[] = [];
private maxLength: number;
@ -27,7 +19,7 @@ export class DeltaCache {
this.updateHydrationEvent(events);
while (this.events.length > this.maxLength) {
this.events.splice(1, 1);
this.events.shift();
}
}
@ -62,15 +54,23 @@ export class DeltaCache {
break;
}
case DELTA_EVENT_TYPES.SEGMENT_UPDATED: {
// TODO: segments do not exist in this scope, need to do it in different location
const segmentToUpdate = this.hydrationEvent.segments.find(
(segment) => segment.id === appliedEvent.segment.id,
);
if (segmentToUpdate) {
Object.assign(segmentToUpdate, appliedEvent.segment);
} else {
this.hydrationEvent.segments.push(appliedEvent.segment);
}
break;
}
case DELTA_EVENT_TYPES.SEGMENT_REMOVED: {
// TODO: segments do not exist in this scope, need to do it in different location
this.hydrationEvent.segments =
this.hydrationEvent.segments.filter(
(segment) => segment.id !== appliedEvent.segmentId,
);
break;
}
default:
// TODO: something is seriously wrong
}
}
}

View File

@ -4,6 +4,8 @@ import {
type IBaseEvent,
type IEvent,
type IEventType,
SEGMENT_CREATED,
SEGMENT_DELETED,
SEGMENT_UPDATED,
} from '../../types/events';
import type { Logger, LogProvider } from '../../logger';
@ -214,6 +216,8 @@ class EventStore implements IEventStore {
SEGMENT_UPDATED,
FEATURE_IMPORT,
FEATURES_IMPORTED,
SEGMENT_CREATED,
SEGMENT_DELETED,
]),
)
.orderBy('id', 'asc');

View File

@ -1,6 +1,5 @@
import type { Logger } from '../../logger';
import type {
IEvent,
IEventStore,
IFlagResolver,
IUnleashConfig,
@ -60,7 +59,7 @@ export default class ConfigurationRevisionService extends EventEmitter {
}
}
async updateMaxRevisionId(): Promise<number> {
async updateMaxRevisionId(emit: boolean = true): Promise<number> {
if (this.flagResolver.isEnabled('disableUpdateMaxRevisionId')) {
return 0;
}
@ -74,16 +73,14 @@ export default class ConfigurationRevisionService extends EventEmitter {
revisionId,
);
this.revisionId = revisionId;
this.emit(UPDATE_REVISION, revisionId);
if (emit) {
this.emit(UPDATE_REVISION, revisionId);
}
}
return this.revisionId;
}
async getRevisionRange(start: number, end: number): Promise<IEvent[]> {
return this.eventStore.getRevisionRange(start, end);
}
destroy(): void {
ConfigurationRevisionService.instance?.removeAllListeners();
}

View File

@ -5,8 +5,9 @@ import type {
} from '../../types';
export interface ISegmentReadModel {
getAll(): Promise<ISegment[]>;
getAll(ids?: number[]): Promise<ISegment[]>;
getAllFeatureStrategySegments(): Promise<IFeatureStrategySegment[]>;
getActive(): Promise<ISegment[]>;
getActiveForClient(): Promise<IClientSegment[]>;
getAllForClient(ids?: number[]): Promise<IClientSegment[]>;
}

View File

@ -61,12 +61,17 @@ export class SegmentReadModel implements ISegmentReadModel {
};
}
async getAll(): Promise<ISegment[]> {
const rows: ISegmentRow[] = await this.db
async getAll(ids?: number[]): Promise<ISegment[]> {
let query = this.db
.select(this.prefixColumns())
.from('segments')
.orderBy('segments.name', 'asc');
if (ids && ids.length > 0) {
query = query.whereIn('id', ids);
}
const rows = await query;
return rows.map(this.mapRow);
}
@ -82,7 +87,7 @@ export class SegmentReadModel implements ISegmentReadModel {
}
async getActive(): Promise<ISegment[]> {
const rows: ISegmentRow[] = await this.db
const query = this.db
.distinct(this.prefixColumns())
.from('segments')
.orderBy('name', 'asc')
@ -91,7 +96,7 @@ export class SegmentReadModel implements ISegmentReadModel {
'feature_strategy_segment.segment_id',
'segments.id',
);
const rows: ISegmentRow[] = await query;
return rows.map(this.mapRow);
}
@ -104,4 +109,14 @@ export class SegmentReadModel implements ISegmentReadModel {
constraints: segments.constraints,
}));
}
async getAllForClient(ids?: number[]): Promise<IClientSegment[]> {
const fullSegments = await this.getAll(ids);
return fullSegments.map((segments) => ({
id: segments.id,
name: segments.name,
constraints: segments.constraints,
}));
}
}

View File

@ -117,6 +117,15 @@ export interface IUnleashHttpAPI {
getRecordedEvents(): supertest.Test;
createSegment(postData: object, expectStatusCode?: number): supertest.Test;
deleteSegment(
segmentId: number,
expectedResponseCode?: number,
): supertest.Test;
updateSegment(
segmentId: number,
postData: object,
expectStatusCode?: number,
): supertest.Test;
}
function httpApis(
@ -288,7 +297,25 @@ function httpApis(
.set('Content-Type', 'application/json')
.expect(expectedResponseCode);
},
deleteSegment(
segmentId: number,
expectedResponseCode = 204,
): supertest.Test {
return request
.delete(`/api/admin/segments/${segmentId}`)
.set('Content-Type', 'application/json')
.expect(expectedResponseCode);
},
updateSegment(
segmentId: number,
postData: object,
expectStatusCode = 204,
): supertest.Test {
return request
.put(`/api/admin/segments/${segmentId}`)
.send(postData)
.expect(expectStatusCode);
},
getRecordedEvents(
project: string | null = null,
expectedResponseCode: number = 200,