diff --git a/src/lib/db/event-store.ts b/src/lib/db/event-store.ts index 26511fd99d..8adabcd61e 100644 --- a/src/lib/db/event-store.ts +++ b/src/lib/db/event-store.ts @@ -98,13 +98,9 @@ class EventStore implements IEventStore { async store(event: IBaseEvent): Promise { try { - const rows = await this.db(TABLE) + await this.db(TABLE) .insert(this.eventToDbRow(event)) .returning(EVENT_COLUMNS); - const savedEvent = this.rowToEvent(rows[0]); - process.nextTick(() => - this.eventEmitter.emit(event.type, savedEvent), - ); } catch (error: unknown) { this.logger.warn(`Failed to store "${event.type}" event: ${error}`); } @@ -148,13 +144,9 @@ class EventStore implements IEventStore { async batchStore(events: IBaseEvent[]): Promise { try { - const savedRows = await this.db(TABLE) + await this.db(TABLE) .insert(events.map(this.eventToDbRow)) .returning(EVENT_COLUMNS); - const savedEvents = savedRows.map(this.rowToEvent); - process.nextTick(() => - savedEvents.forEach((e) => this.eventEmitter.emit(e.type, e)), - ); } catch (error: unknown) { this.logger.warn(`Failed to store events: ${error}`); } @@ -411,6 +403,22 @@ class EventStore implements IEventStore { ): EventEmitter { return this.eventEmitter.off(eventName, listener); } + + private async setUnannouncedToAnnounced(): Promise { + const rows = await this.db(TABLE) + .update({ announced: true }) + .where('announced', false) + .whereNotNull('announced') + .returning(EVENT_COLUMNS); + + return rows.map(this.rowToEvent); + } + + async publishUnannouncedEvents(): Promise { + const events = await this.setUnannouncedToAnnounced(); + + events.forEach((e) => this.eventEmitter.emit(e.type, e)); + } } export default EventStore; diff --git a/src/lib/services/event-announcer-service.ts b/src/lib/services/event-announcer-service.ts new file mode 100644 index 0000000000..6cc5c5154d --- /dev/null +++ b/src/lib/services/event-announcer-service.ts @@ -0,0 +1,22 @@ +import { IUnleashConfig } from '../types/option'; +import { IUnleashStores } from '../types/stores'; +import { Logger } from '../logger'; +import { IEventStore } from '../types/stores/event-store'; + +export default class EventAnnouncer { + private logger: Logger; + + private eventStore: IEventStore; + + constructor( + { eventStore }: Pick, + { getLogger }: Pick, + ) { + this.logger = getLogger('services/event-service.ts'); + this.eventStore = eventStore; + } + + async publishUnannouncedEvents(): Promise { + return this.eventStore.publishUnannouncedEvents(); + } +} diff --git a/src/lib/services/index.ts b/src/lib/services/index.ts index 075747b166..57a1093ef3 100644 --- a/src/lib/services/index.ts +++ b/src/lib/services/index.ts @@ -58,6 +58,7 @@ import { } from '../features/change-request-access-service/createChangeRequestAccessReadModel'; import ConfigurationRevisionService from '../features/feature-toggle/configuration-revision-service'; import { createFeatureToggleService } from '../features'; +import EventAnnouncerService from './event-announcer-service'; // TODO: will be moved to scheduler feature directory export const scheduleServices = async ( @@ -72,6 +73,7 @@ export const scheduleServices = async ( projectHealthService, configurationRevisionService, maintenanceService, + eventAnnouncerService, } = services; if (await maintenanceService.isMaintenanceMode()) { @@ -116,6 +118,13 @@ export const scheduleServices = async ( ), secondsToMilliseconds(1), ); + + schedulerService.schedule( + eventAnnouncerService.publishUnannouncedEvents.bind( + eventAnnouncerService, + ), + secondsToMilliseconds(1), + ); }; export const createServices = ( @@ -240,10 +249,13 @@ export const createServices = ( schedulerService, ); + const eventAnnouncerService = new EventAnnouncerService(stores, config); + return { accessService, accountService, addonService, + eventAnnouncerService, featureToggleService: featureToggleServiceV2, featureToggleServiceV2, featureTypeService, diff --git a/src/lib/types/services.ts b/src/lib/types/services.ts index 9c9597bd72..36e85b3902 100644 --- a/src/lib/types/services.ts +++ b/src/lib/types/services.ts @@ -42,6 +42,7 @@ import { Knex } from 'knex'; import ExportImportService from '../features/export-import-toggles/export-import-service'; import { ISegmentService } from '../segments/segment-service-interface'; import ConfigurationRevisionService from '../features/feature-toggle/configuration-revision-service'; +import EventAnnouncerService from 'lib/services/event-announcer-service'; export interface IUnleashServices { accessService: AccessService; @@ -88,6 +89,7 @@ export interface IUnleashServices { exportImportService: ExportImportService; configurationRevisionService: ConfigurationRevisionService; schedulerService: SchedulerService; + eventAnnouncerService: EventAnnouncerService; transactionalExportImportService: ( db: Knex.Transaction, ) => ExportImportService; diff --git a/src/lib/types/stores/event-store.ts b/src/lib/types/stores/event-store.ts index ac53c6b887..307a6695df 100644 --- a/src/lib/types/stores/event-store.ts +++ b/src/lib/types/stores/event-store.ts @@ -7,6 +7,7 @@ import { IQueryOperations } from 'lib/db/event-store'; export interface IEventStore extends Store, Pick { + publishUnannouncedEvents(): Promise; store(event: IBaseEvent): Promise; batchStore(events: IBaseEvent[]): Promise; getEvents(): Promise; diff --git a/src/migrations/20230706123907-events-announced-column.js b/src/migrations/20230706123907-events-announced-column.js new file mode 100644 index 0000000000..f5cfb52370 --- /dev/null +++ b/src/migrations/20230706123907-events-announced-column.js @@ -0,0 +1,22 @@ +'use strict'; + +exports.up = function (db, cb) { + // mark existing events as announced, set the default to false for future events. + db.runSql( + ` + ALTER TABLE events + ADD COLUMN IF NOT EXISTS "announced" BOOLEAN DEFAULT TRUE, + ALTER COLUMN "announced" SET DEFAULT FALSE; + `, + cb(), + ); +}; + +exports.down = function (db, cb) { + db.runSql( + ` + ALTER TABLE events DROP COLUMN IF EXISTS "announced"; + `, + cb, + ); +}; diff --git a/src/test/e2e/stores/event-store.e2e.test.ts b/src/test/e2e/stores/event-store.e2e.test.ts index 72e14dce41..6c56769f43 100644 --- a/src/test/e2e/stores/event-store.e2e.test.ts +++ b/src/test/e2e/stores/event-store.e2e.test.ts @@ -43,7 +43,7 @@ test('Should include id and createdAt when saving', async () => { const seen: Array = []; eventStore.on(APPLICATION_CREATED, (e) => seen.push(e)); await eventStore.store(event1); - jest.advanceTimersByTime(100); + await eventStore.publishUnannouncedEvents(); expect(seen).toHaveLength(1); expect(seen[0].id).toBeTruthy(); expect(seen[0].createdAt).toBeTruthy(); @@ -74,6 +74,7 @@ test('Should include empty tags array for new event', async () => { // Trigger await eventStore.store(event); + await eventStore.publishUnannouncedEvents(); return promise; }); @@ -108,7 +109,7 @@ test('Should be able to store multiple events at once', async () => { const seen = []; eventStore.on(APPLICATION_CREATED, (e) => seen.push(e)); await eventStore.batchStore([event1, event2, event3]); - jest.advanceTimersByTime(100); + await eventStore.publishUnannouncedEvents(); expect(seen.length).toBe(3); seen.forEach((e) => { expect(e.id).toBeTruthy(); diff --git a/src/test/fixtures/fake-event-store.ts b/src/test/fixtures/fake-event-store.ts index d5dc3b0f39..7dcfc558af 100644 --- a/src/test/fixtures/fake-event-store.ts +++ b/src/test/fixtures/fake-event-store.ts @@ -121,6 +121,10 @@ class FakeEventStore implements IEventStore { ): EventEmitter { return this.eventEmitter.off(eventName, listener); } + + publishUnannouncedEvents(): Promise { + throw new Error('Method not implemented.'); + } } module.exports = FakeEventStore;