From e15aa9795a6eea4e2c6c30f747e1ac99344ab821 Mon Sep 17 00:00:00 2001 From: Mateusz Kwasniewski Date: Thu, 2 Mar 2023 09:52:19 +0100 Subject: [PATCH] feat: shared event emitter (#3241) --- src/lib/db/event-store.ts | 38 ++++++++++++++++--- src/lib/event-hook.ts | 2 +- .../createExportImportService.ts | 11 ++---- .../createFeatureToggleService.ts | 5 ++- src/lib/services/index.ts | 4 +- src/lib/types/stores/event-store.ts | 4 +- src/lib/util/anyEventEmitter.ts | 2 + src/test/fixtures/fake-event-store.ts | 36 +++++++++++++++--- 8 files changed, 78 insertions(+), 24 deletions(-) diff --git a/src/lib/db/event-store.ts b/src/lib/db/event-store.ts index 7df413a55e..344a79ca76 100644 --- a/src/lib/db/event-store.ts +++ b/src/lib/db/event-store.ts @@ -3,9 +3,10 @@ import { LogProvider, Logger } from '../logger'; import { IEventStore } from '../types/stores/event-store'; import { ITag } from '../types/model'; import { SearchEventsSchema } from '../openapi/spec/search-events-schema'; -import { AnyEventEmitter } from '../util/anyEventEmitter'; +import { sharedEventEmitter } from '../util/anyEventEmitter'; import { Db } from './db'; import { Knex } from 'knex'; +import EventEmitter from 'events'; const EVENT_COLUMNS = [ 'id', @@ -76,13 +77,16 @@ export interface IEventTable { const TABLE = 'events'; -class EventStore extends AnyEventEmitter implements IEventStore { +class EventStore implements IEventStore { private db: Db; + // only one shared event emitter should exist across all event store instances + private eventEmitter: EventEmitter = sharedEventEmitter; + private logger: Logger; + // a new DB has to be injected per transaction constructor(db: Db, getLogger: LogProvider) { - super(); this.db = db; this.logger = getLogger('lib/db/event-store.ts'); } @@ -93,7 +97,9 @@ class EventStore extends AnyEventEmitter implements IEventStore { .insert(this.eventToDbRow(event)) .returning(EVENT_COLUMNS); const savedEvent = this.rowToEvent(rows[0]); - process.nextTick(() => this.emit(event.type, savedEvent)); + process.nextTick(() => + this.eventEmitter.emit(event.type, savedEvent), + ); } catch (error: unknown) { this.logger.warn(`Failed to store "${event.type}" event: ${error}`); } @@ -136,7 +142,7 @@ class EventStore extends AnyEventEmitter implements IEventStore { .returning(EVENT_COLUMNS); const savedEvents = savedRows.map(this.rowToEvent); process.nextTick(() => - savedEvents.forEach((e) => this.emit(e.type, e)), + savedEvents.forEach((e) => this.eventEmitter.emit(e.type, e)), ); } catch (error: unknown) { this.logger.warn(`Failed to store events: ${error}`); @@ -330,6 +336,28 @@ class EventStore extends AnyEventEmitter implements IEventStore { environment: e.environment, }; } + + setMaxListeners(number: number): EventEmitter { + return this.eventEmitter.setMaxListeners(number); + } + + on( + eventName: string | symbol, + listener: (...args: any[]) => void, + ): EventEmitter { + return this.eventEmitter.on(eventName, listener); + } + + emit(eventName: string | symbol, ...args: any[]): boolean { + return this.eventEmitter.emit(eventName, ...args); + } + + off( + eventName: string | symbol, + listener: (...args: any[]) => void, + ): EventEmitter { + return this.eventEmitter.off(eventName, listener); + } } export default EventStore; diff --git a/src/lib/event-hook.ts b/src/lib/event-hook.ts index 6b7363a986..cf6bc2a847 100644 --- a/src/lib/event-hook.ts +++ b/src/lib/event-hook.ts @@ -9,7 +9,7 @@ import { export const addEventHook = ( eventHook: EventHook, - eventStore: EventEmitter, + eventStore: Pick, ): void => { eventStore.on(FEATURE_CREATED, (data) => { eventHook(FEATURE_CREATED, data); diff --git a/src/lib/features/export-import-toggles/createExportImportService.ts b/src/lib/features/export-import-toggles/createExportImportService.ts index f40f4f218b..fe04184cef 100644 --- a/src/lib/features/export-import-toggles/createExportImportService.ts +++ b/src/lib/features/export-import-toggles/createExportImportService.ts @@ -1,5 +1,5 @@ import { Db } from '../../db/db'; -import { IEventStore, IUnleashConfig } from '../../types'; +import { IUnleashConfig } from '../../types'; import ExportImportService from './export-import-service'; import { ImportTogglesStore } from './import-toggles-store'; import FeatureToggleStore from '../../db/feature-toggle-store'; @@ -37,6 +37,7 @@ import FakeEventStore from '../../../test/fixtures/fake-event-store'; import FakeFeatureStrategiesStore from '../../../test/fixtures/fake-feature-strategies-store'; import FakeFeatureEnvironmentStore from '../../../test/fixtures/fake-feature-environment-store'; import FakeStrategiesStore from '../../../test/fixtures/fake-strategies-store'; +import EventStore from '../../db/event-store'; export const createFakeExportImportTogglesService = ( config: IUnleashConfig, @@ -111,7 +112,6 @@ export const createFakeExportImportTogglesService = ( export const createExportImportTogglesService = ( db: Db, config: IUnleashConfig, - eventStore: IEventStore, ): ExportImportService => { const { eventBus, getLogger, flagResolver } = config; const importTogglesStore = new ImportTogglesStore(db); @@ -139,12 +139,9 @@ export const createExportImportTogglesService = ( eventBus, getLogger, ); + const eventStore = new EventStore(db, getLogger); const accessService = createAccessService(db, config); - const featureToggleService = createFeatureToggleService( - db, - config, - eventStore, - ); + const featureToggleService = createFeatureToggleService(db, config); const featureTagService = new FeatureTagService( { diff --git a/src/lib/features/feature-toggle/createFeatureToggleService.ts b/src/lib/features/feature-toggle/createFeatureToggleService.ts index 3472c906c0..1b2930ef9f 100644 --- a/src/lib/features/feature-toggle/createFeatureToggleService.ts +++ b/src/lib/features/feature-toggle/createFeatureToggleService.ts @@ -18,7 +18,7 @@ import { AccessStore } from '../../db/access-store'; import RoleStore from '../../db/role-store'; import EnvironmentStore from '../../db/environment-store'; import { Db } from '../../db/db'; -import { IEventStore, IUnleashConfig } from '../../types'; +import { IUnleashConfig } from '../../types'; import FakeEventStore from '../../../test/fixtures/fake-event-store'; import FakeFeatureStrategiesStore from '../../../test/fixtures/fake-feature-strategies-store'; import FakeFeatureToggleStore from '../../../test/fixtures/fake-feature-toggle-store'; @@ -33,11 +33,11 @@ import { FakeAccountStore } from '../../../test/fixtures/fake-account-store'; import FakeAccessStore from '../../../test/fixtures/fake-access-store'; import FakeRoleStore from '../../../test/fixtures/fake-role-store'; import FakeEnvironmentStore from '../../../test/fixtures/fake-environment-store'; +import EventStore from '../../db/event-store'; export const createFeatureToggleService = ( db: Db, config: IUnleashConfig, - eventStore: IEventStore, ): FeatureToggleService => { const { getLogger, eventBus, flagResolver } = config; const featureStrategiesStore = new FeatureStrategiesStore( @@ -73,6 +73,7 @@ export const createFeatureToggleService = ( const accessStore = new AccessStore(db, eventBus, getLogger); const roleStore = new RoleStore(db, eventBus, getLogger); const environmentStore = new EnvironmentStore(db, eventBus, getLogger); + const eventStore = new EventStore(db, getLogger); const groupService = new GroupService( { groupStore, eventStore, accountStore }, { getLogger }, diff --git a/src/lib/services/index.ts b/src/lib/services/index.ts index cdc3358cb7..b455e190a8 100644 --- a/src/lib/services/index.ts +++ b/src/lib/services/index.ts @@ -164,10 +164,10 @@ export const createServices = ( // TODO: this is a temporary seam to enable packaging by feature const exportImportService = db - ? createExportImportTogglesService(db, config, stores.eventStore) + ? createExportImportTogglesService(db, config) : createFakeExportImportTogglesService(config); const transactionalExportImportService = (txDb: Knex.Transaction) => - createExportImportTogglesService(txDb, config, stores.eventStore); + createExportImportTogglesService(txDb, config); const userSplashService = new UserSplashService(stores, config); const openApiService = new OpenApiService(config); const clientSpecService = new ClientSpecService(config); diff --git a/src/lib/types/stores/event-store.ts b/src/lib/types/stores/event-store.ts index 5a92287b19..ec3c01cb6a 100644 --- a/src/lib/types/stores/event-store.ts +++ b/src/lib/types/stores/event-store.ts @@ -4,7 +4,9 @@ import { SearchEventsSchema } from '../../openapi/spec/search-events-schema'; import EventEmitter from 'events'; import { IQueryOperations } from 'lib/db/event-store'; -export interface IEventStore extends Store, EventEmitter { +export interface IEventStore + extends Store, + Pick { store(event: IBaseEvent): Promise; batchStore(events: IBaseEvent[]): Promise; getEvents(): Promise; diff --git a/src/lib/util/anyEventEmitter.ts b/src/lib/util/anyEventEmitter.ts index 76ccaa710f..1e3c9bdd90 100644 --- a/src/lib/util/anyEventEmitter.ts +++ b/src/lib/util/anyEventEmitter.ts @@ -10,3 +10,5 @@ export class AnyEventEmitter extends EventEmitter { return super.emit(type, ...args) || super.emit('', ...args); } } + +export const sharedEventEmitter = new AnyEventEmitter(); diff --git a/src/test/fixtures/fake-event-store.ts b/src/test/fixtures/fake-event-store.ts index e24ab43760..5b260440f4 100644 --- a/src/test/fixtures/fake-event-store.ts +++ b/src/test/fixtures/fake-event-store.ts @@ -1,28 +1,30 @@ import { IEventStore } from '../../lib/types/stores/event-store'; import { IEvent } from '../../lib/types/events'; -import { AnyEventEmitter } from '../../lib/util/anyEventEmitter'; +import { sharedEventEmitter } from '../../lib/util/anyEventEmitter'; import { IQueryOperations } from 'lib/db/event-store'; import { SearchEventsSchema } from '../../lib/openapi'; +import EventEmitter from 'events'; -class FakeEventStore extends AnyEventEmitter implements IEventStore { +class FakeEventStore implements IEventStore { events: IEvent[]; + private eventEmitter: EventEmitter = sharedEventEmitter; + constructor() { - super(); - this.setMaxListeners(0); + this.eventEmitter.setMaxListeners(0); this.events = []; } store(event: IEvent): Promise { this.events.push(event); - this.emit(event.type, event); + this.eventEmitter.emit(event.type, event); return Promise.resolve(); } batchStore(events: IEvent[]): Promise { events.forEach((event) => { this.events.push(event); - this.emit(event.type, event); + this.eventEmitter.emit(event.type, event); }); return Promise.resolve(); } @@ -88,6 +90,28 @@ class FakeEventStore extends AnyEventEmitter implements IEventStore { if (operations) return []; return []; } + + setMaxListeners(number: number): EventEmitter { + return this.eventEmitter.setMaxListeners(number); + } + + on( + eventName: string | symbol, + listener: (...args: any[]) => void, + ): EventEmitter { + return this.eventEmitter.on(eventName, listener); + } + + emit(eventName: string | symbol, ...args: any[]): boolean { + return this.eventEmitter.emit(eventName, ...args); + } + + off( + eventName: string | symbol, + listener: (...args: any[]) => void, + ): EventEmitter { + return this.eventEmitter.off(eventName, listener); + } } module.exports = FakeEventStore;