mirror of
https://github.com/Unleash/unleash.git
synced 2025-01-20 00:08:02 +01:00
feat: shared event emitter (#3241)
This commit is contained in:
parent
090f29fe45
commit
e15aa9795a
@ -3,9 +3,10 @@ import { LogProvider, Logger } from '../logger';
|
|||||||
import { IEventStore } from '../types/stores/event-store';
|
import { IEventStore } from '../types/stores/event-store';
|
||||||
import { ITag } from '../types/model';
|
import { ITag } from '../types/model';
|
||||||
import { SearchEventsSchema } from '../openapi/spec/search-events-schema';
|
import { SearchEventsSchema } from '../openapi/spec/search-events-schema';
|
||||||
import { AnyEventEmitter } from '../util/anyEventEmitter';
|
import { sharedEventEmitter } from '../util/anyEventEmitter';
|
||||||
import { Db } from './db';
|
import { Db } from './db';
|
||||||
import { Knex } from 'knex';
|
import { Knex } from 'knex';
|
||||||
|
import EventEmitter from 'events';
|
||||||
|
|
||||||
const EVENT_COLUMNS = [
|
const EVENT_COLUMNS = [
|
||||||
'id',
|
'id',
|
||||||
@ -76,13 +77,16 @@ export interface IEventTable {
|
|||||||
|
|
||||||
const TABLE = 'events';
|
const TABLE = 'events';
|
||||||
|
|
||||||
class EventStore extends AnyEventEmitter implements IEventStore {
|
class EventStore implements IEventStore {
|
||||||
private db: Db;
|
private db: Db;
|
||||||
|
|
||||||
|
// only one shared event emitter should exist across all event store instances
|
||||||
|
private eventEmitter: EventEmitter = sharedEventEmitter;
|
||||||
|
|
||||||
private logger: Logger;
|
private logger: Logger;
|
||||||
|
|
||||||
|
// a new DB has to be injected per transaction
|
||||||
constructor(db: Db, getLogger: LogProvider) {
|
constructor(db: Db, getLogger: LogProvider) {
|
||||||
super();
|
|
||||||
this.db = db;
|
this.db = db;
|
||||||
this.logger = getLogger('lib/db/event-store.ts');
|
this.logger = getLogger('lib/db/event-store.ts');
|
||||||
}
|
}
|
||||||
@ -93,7 +97,9 @@ class EventStore extends AnyEventEmitter implements IEventStore {
|
|||||||
.insert(this.eventToDbRow(event))
|
.insert(this.eventToDbRow(event))
|
||||||
.returning(EVENT_COLUMNS);
|
.returning(EVENT_COLUMNS);
|
||||||
const savedEvent = this.rowToEvent(rows[0]);
|
const savedEvent = this.rowToEvent(rows[0]);
|
||||||
process.nextTick(() => this.emit(event.type, savedEvent));
|
process.nextTick(() =>
|
||||||
|
this.eventEmitter.emit(event.type, savedEvent),
|
||||||
|
);
|
||||||
} catch (error: unknown) {
|
} catch (error: unknown) {
|
||||||
this.logger.warn(`Failed to store "${event.type}" event: ${error}`);
|
this.logger.warn(`Failed to store "${event.type}" event: ${error}`);
|
||||||
}
|
}
|
||||||
@ -136,7 +142,7 @@ class EventStore extends AnyEventEmitter implements IEventStore {
|
|||||||
.returning(EVENT_COLUMNS);
|
.returning(EVENT_COLUMNS);
|
||||||
const savedEvents = savedRows.map(this.rowToEvent);
|
const savedEvents = savedRows.map(this.rowToEvent);
|
||||||
process.nextTick(() =>
|
process.nextTick(() =>
|
||||||
savedEvents.forEach((e) => this.emit(e.type, e)),
|
savedEvents.forEach((e) => this.eventEmitter.emit(e.type, e)),
|
||||||
);
|
);
|
||||||
} catch (error: unknown) {
|
} catch (error: unknown) {
|
||||||
this.logger.warn(`Failed to store events: ${error}`);
|
this.logger.warn(`Failed to store events: ${error}`);
|
||||||
@ -330,6 +336,28 @@ class EventStore extends AnyEventEmitter implements IEventStore {
|
|||||||
environment: e.environment,
|
environment: e.environment,
|
||||||
};
|
};
|
||||||
}
|
}
|
||||||
|
|
||||||
|
setMaxListeners(number: number): EventEmitter {
|
||||||
|
return this.eventEmitter.setMaxListeners(number);
|
||||||
|
}
|
||||||
|
|
||||||
|
on(
|
||||||
|
eventName: string | symbol,
|
||||||
|
listener: (...args: any[]) => void,
|
||||||
|
): EventEmitter {
|
||||||
|
return this.eventEmitter.on(eventName, listener);
|
||||||
|
}
|
||||||
|
|
||||||
|
emit(eventName: string | symbol, ...args: any[]): boolean {
|
||||||
|
return this.eventEmitter.emit(eventName, ...args);
|
||||||
|
}
|
||||||
|
|
||||||
|
off(
|
||||||
|
eventName: string | symbol,
|
||||||
|
listener: (...args: any[]) => void,
|
||||||
|
): EventEmitter {
|
||||||
|
return this.eventEmitter.off(eventName, listener);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
export default EventStore;
|
export default EventStore;
|
||||||
|
@ -9,7 +9,7 @@ import {
|
|||||||
|
|
||||||
export const addEventHook = (
|
export const addEventHook = (
|
||||||
eventHook: EventHook,
|
eventHook: EventHook,
|
||||||
eventStore: EventEmitter,
|
eventStore: Pick<EventEmitter, 'on'>,
|
||||||
): void => {
|
): void => {
|
||||||
eventStore.on(FEATURE_CREATED, (data) => {
|
eventStore.on(FEATURE_CREATED, (data) => {
|
||||||
eventHook(FEATURE_CREATED, data);
|
eventHook(FEATURE_CREATED, data);
|
||||||
|
@ -1,5 +1,5 @@
|
|||||||
import { Db } from '../../db/db';
|
import { Db } from '../../db/db';
|
||||||
import { IEventStore, IUnleashConfig } from '../../types';
|
import { IUnleashConfig } from '../../types';
|
||||||
import ExportImportService from './export-import-service';
|
import ExportImportService from './export-import-service';
|
||||||
import { ImportTogglesStore } from './import-toggles-store';
|
import { ImportTogglesStore } from './import-toggles-store';
|
||||||
import FeatureToggleStore from '../../db/feature-toggle-store';
|
import FeatureToggleStore from '../../db/feature-toggle-store';
|
||||||
@ -37,6 +37,7 @@ import FakeEventStore from '../../../test/fixtures/fake-event-store';
|
|||||||
import FakeFeatureStrategiesStore from '../../../test/fixtures/fake-feature-strategies-store';
|
import FakeFeatureStrategiesStore from '../../../test/fixtures/fake-feature-strategies-store';
|
||||||
import FakeFeatureEnvironmentStore from '../../../test/fixtures/fake-feature-environment-store';
|
import FakeFeatureEnvironmentStore from '../../../test/fixtures/fake-feature-environment-store';
|
||||||
import FakeStrategiesStore from '../../../test/fixtures/fake-strategies-store';
|
import FakeStrategiesStore from '../../../test/fixtures/fake-strategies-store';
|
||||||
|
import EventStore from '../../db/event-store';
|
||||||
|
|
||||||
export const createFakeExportImportTogglesService = (
|
export const createFakeExportImportTogglesService = (
|
||||||
config: IUnleashConfig,
|
config: IUnleashConfig,
|
||||||
@ -111,7 +112,6 @@ export const createFakeExportImportTogglesService = (
|
|||||||
export const createExportImportTogglesService = (
|
export const createExportImportTogglesService = (
|
||||||
db: Db,
|
db: Db,
|
||||||
config: IUnleashConfig,
|
config: IUnleashConfig,
|
||||||
eventStore: IEventStore,
|
|
||||||
): ExportImportService => {
|
): ExportImportService => {
|
||||||
const { eventBus, getLogger, flagResolver } = config;
|
const { eventBus, getLogger, flagResolver } = config;
|
||||||
const importTogglesStore = new ImportTogglesStore(db);
|
const importTogglesStore = new ImportTogglesStore(db);
|
||||||
@ -139,12 +139,9 @@ export const createExportImportTogglesService = (
|
|||||||
eventBus,
|
eventBus,
|
||||||
getLogger,
|
getLogger,
|
||||||
);
|
);
|
||||||
|
const eventStore = new EventStore(db, getLogger);
|
||||||
const accessService = createAccessService(db, config);
|
const accessService = createAccessService(db, config);
|
||||||
const featureToggleService = createFeatureToggleService(
|
const featureToggleService = createFeatureToggleService(db, config);
|
||||||
db,
|
|
||||||
config,
|
|
||||||
eventStore,
|
|
||||||
);
|
|
||||||
|
|
||||||
const featureTagService = new FeatureTagService(
|
const featureTagService = new FeatureTagService(
|
||||||
{
|
{
|
||||||
|
@ -18,7 +18,7 @@ import { AccessStore } from '../../db/access-store';
|
|||||||
import RoleStore from '../../db/role-store';
|
import RoleStore from '../../db/role-store';
|
||||||
import EnvironmentStore from '../../db/environment-store';
|
import EnvironmentStore from '../../db/environment-store';
|
||||||
import { Db } from '../../db/db';
|
import { Db } from '../../db/db';
|
||||||
import { IEventStore, IUnleashConfig } from '../../types';
|
import { IUnleashConfig } from '../../types';
|
||||||
import FakeEventStore from '../../../test/fixtures/fake-event-store';
|
import FakeEventStore from '../../../test/fixtures/fake-event-store';
|
||||||
import FakeFeatureStrategiesStore from '../../../test/fixtures/fake-feature-strategies-store';
|
import FakeFeatureStrategiesStore from '../../../test/fixtures/fake-feature-strategies-store';
|
||||||
import FakeFeatureToggleStore from '../../../test/fixtures/fake-feature-toggle-store';
|
import FakeFeatureToggleStore from '../../../test/fixtures/fake-feature-toggle-store';
|
||||||
@ -33,11 +33,11 @@ import { FakeAccountStore } from '../../../test/fixtures/fake-account-store';
|
|||||||
import FakeAccessStore from '../../../test/fixtures/fake-access-store';
|
import FakeAccessStore from '../../../test/fixtures/fake-access-store';
|
||||||
import FakeRoleStore from '../../../test/fixtures/fake-role-store';
|
import FakeRoleStore from '../../../test/fixtures/fake-role-store';
|
||||||
import FakeEnvironmentStore from '../../../test/fixtures/fake-environment-store';
|
import FakeEnvironmentStore from '../../../test/fixtures/fake-environment-store';
|
||||||
|
import EventStore from '../../db/event-store';
|
||||||
|
|
||||||
export const createFeatureToggleService = (
|
export const createFeatureToggleService = (
|
||||||
db: Db,
|
db: Db,
|
||||||
config: IUnleashConfig,
|
config: IUnleashConfig,
|
||||||
eventStore: IEventStore,
|
|
||||||
): FeatureToggleService => {
|
): FeatureToggleService => {
|
||||||
const { getLogger, eventBus, flagResolver } = config;
|
const { getLogger, eventBus, flagResolver } = config;
|
||||||
const featureStrategiesStore = new FeatureStrategiesStore(
|
const featureStrategiesStore = new FeatureStrategiesStore(
|
||||||
@ -73,6 +73,7 @@ export const createFeatureToggleService = (
|
|||||||
const accessStore = new AccessStore(db, eventBus, getLogger);
|
const accessStore = new AccessStore(db, eventBus, getLogger);
|
||||||
const roleStore = new RoleStore(db, eventBus, getLogger);
|
const roleStore = new RoleStore(db, eventBus, getLogger);
|
||||||
const environmentStore = new EnvironmentStore(db, eventBus, getLogger);
|
const environmentStore = new EnvironmentStore(db, eventBus, getLogger);
|
||||||
|
const eventStore = new EventStore(db, getLogger);
|
||||||
const groupService = new GroupService(
|
const groupService = new GroupService(
|
||||||
{ groupStore, eventStore, accountStore },
|
{ groupStore, eventStore, accountStore },
|
||||||
{ getLogger },
|
{ getLogger },
|
||||||
|
@ -164,10 +164,10 @@ export const createServices = (
|
|||||||
|
|
||||||
// TODO: this is a temporary seam to enable packaging by feature
|
// TODO: this is a temporary seam to enable packaging by feature
|
||||||
const exportImportService = db
|
const exportImportService = db
|
||||||
? createExportImportTogglesService(db, config, stores.eventStore)
|
? createExportImportTogglesService(db, config)
|
||||||
: createFakeExportImportTogglesService(config);
|
: createFakeExportImportTogglesService(config);
|
||||||
const transactionalExportImportService = (txDb: Knex.Transaction) =>
|
const transactionalExportImportService = (txDb: Knex.Transaction) =>
|
||||||
createExportImportTogglesService(txDb, config, stores.eventStore);
|
createExportImportTogglesService(txDb, config);
|
||||||
const userSplashService = new UserSplashService(stores, config);
|
const userSplashService = new UserSplashService(stores, config);
|
||||||
const openApiService = new OpenApiService(config);
|
const openApiService = new OpenApiService(config);
|
||||||
const clientSpecService = new ClientSpecService(config);
|
const clientSpecService = new ClientSpecService(config);
|
||||||
|
@ -4,7 +4,9 @@ import { SearchEventsSchema } from '../../openapi/spec/search-events-schema';
|
|||||||
import EventEmitter from 'events';
|
import EventEmitter from 'events';
|
||||||
import { IQueryOperations } from 'lib/db/event-store';
|
import { IQueryOperations } from 'lib/db/event-store';
|
||||||
|
|
||||||
export interface IEventStore extends Store<IEvent, number>, EventEmitter {
|
export interface IEventStore
|
||||||
|
extends Store<IEvent, number>,
|
||||||
|
Pick<EventEmitter, 'on' | 'setMaxListeners' | 'emit' | 'off'> {
|
||||||
store(event: IBaseEvent): Promise<void>;
|
store(event: IBaseEvent): Promise<void>;
|
||||||
batchStore(events: IBaseEvent[]): Promise<void>;
|
batchStore(events: IBaseEvent[]): Promise<void>;
|
||||||
getEvents(): Promise<IEvent[]>;
|
getEvents(): Promise<IEvent[]>;
|
||||||
|
@ -10,3 +10,5 @@ export class AnyEventEmitter extends EventEmitter {
|
|||||||
return super.emit(type, ...args) || super.emit('', ...args);
|
return super.emit(type, ...args) || super.emit('', ...args);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
export const sharedEventEmitter = new AnyEventEmitter();
|
||||||
|
36
src/test/fixtures/fake-event-store.ts
vendored
36
src/test/fixtures/fake-event-store.ts
vendored
@ -1,28 +1,30 @@
|
|||||||
import { IEventStore } from '../../lib/types/stores/event-store';
|
import { IEventStore } from '../../lib/types/stores/event-store';
|
||||||
import { IEvent } from '../../lib/types/events';
|
import { IEvent } from '../../lib/types/events';
|
||||||
import { AnyEventEmitter } from '../../lib/util/anyEventEmitter';
|
import { sharedEventEmitter } from '../../lib/util/anyEventEmitter';
|
||||||
import { IQueryOperations } from 'lib/db/event-store';
|
import { IQueryOperations } from 'lib/db/event-store';
|
||||||
import { SearchEventsSchema } from '../../lib/openapi';
|
import { SearchEventsSchema } from '../../lib/openapi';
|
||||||
|
import EventEmitter from 'events';
|
||||||
|
|
||||||
class FakeEventStore extends AnyEventEmitter implements IEventStore {
|
class FakeEventStore implements IEventStore {
|
||||||
events: IEvent[];
|
events: IEvent[];
|
||||||
|
|
||||||
|
private eventEmitter: EventEmitter = sharedEventEmitter;
|
||||||
|
|
||||||
constructor() {
|
constructor() {
|
||||||
super();
|
this.eventEmitter.setMaxListeners(0);
|
||||||
this.setMaxListeners(0);
|
|
||||||
this.events = [];
|
this.events = [];
|
||||||
}
|
}
|
||||||
|
|
||||||
store(event: IEvent): Promise<void> {
|
store(event: IEvent): Promise<void> {
|
||||||
this.events.push(event);
|
this.events.push(event);
|
||||||
this.emit(event.type, event);
|
this.eventEmitter.emit(event.type, event);
|
||||||
return Promise.resolve();
|
return Promise.resolve();
|
||||||
}
|
}
|
||||||
|
|
||||||
batchStore(events: IEvent[]): Promise<void> {
|
batchStore(events: IEvent[]): Promise<void> {
|
||||||
events.forEach((event) => {
|
events.forEach((event) => {
|
||||||
this.events.push(event);
|
this.events.push(event);
|
||||||
this.emit(event.type, event);
|
this.eventEmitter.emit(event.type, event);
|
||||||
});
|
});
|
||||||
return Promise.resolve();
|
return Promise.resolve();
|
||||||
}
|
}
|
||||||
@ -88,6 +90,28 @@ class FakeEventStore extends AnyEventEmitter implements IEventStore {
|
|||||||
if (operations) return [];
|
if (operations) return [];
|
||||||
return [];
|
return [];
|
||||||
}
|
}
|
||||||
|
|
||||||
|
setMaxListeners(number: number): EventEmitter {
|
||||||
|
return this.eventEmitter.setMaxListeners(number);
|
||||||
|
}
|
||||||
|
|
||||||
|
on(
|
||||||
|
eventName: string | symbol,
|
||||||
|
listener: (...args: any[]) => void,
|
||||||
|
): EventEmitter {
|
||||||
|
return this.eventEmitter.on(eventName, listener);
|
||||||
|
}
|
||||||
|
|
||||||
|
emit(eventName: string | symbol, ...args: any[]): boolean {
|
||||||
|
return this.eventEmitter.emit(eventName, ...args);
|
||||||
|
}
|
||||||
|
|
||||||
|
off(
|
||||||
|
eventName: string | symbol,
|
||||||
|
listener: (...args: any[]) => void,
|
||||||
|
): EventEmitter {
|
||||||
|
return this.eventEmitter.off(eventName, listener);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
module.exports = FakeEventStore;
|
module.exports = FakeEventStore;
|
||||||
|
Loading…
Reference in New Issue
Block a user