From d60e505a4090f81167f8bd08e5a102b4886cb66a Mon Sep 17 00:00:00 2001 From: Thomas Heartman Date: Mon, 10 Jul 2023 08:43:22 +0200 Subject: [PATCH] 1-1049 Emit events after db transaction is complete (#4174) This PR fixes an issue where events generated during a db transaction would get published before the transaction was complete. This caused errors in some of our services that expected the data to be stored before the transaction had been commited. Refer to [linear issue 1-1049](https://linear.app/unleash/issue/1-1049/event-emitter-should-emit-events-after-db-transaction-is-commited-not) for more info. Fixes 1-1049. ## Changes The most important change here is that the `eventStore` no longer emits events when they happen (because that can be in the middle of a transaction). Instead, events are stored with a new `announced` column. The new event announcer service runs on a schedule (every second) and publishes any new events that have not been published. Parts of the code have largely been lifted from the `client-application-store`, which uses a similar logic. I have kept the emitting of the event within the event store because a lot of other services listen to events from this store, so removing that would require a large rewrite. It's something we could look into down the line, but it seems like too much of a change to do right now. ## Discussion ### Terminology: Published vs announced? We should settle on one or the other. Announced is consistent with the client-application store, but published sounds more fitting for events. ### Publishing and marking events as published The current implementation fetches all events that haven't been marked as announced, sets them as announced, and then emits them. It's possible that Unleash would crash in the interim or something else might happen, causing the events not to get published. Maybe it would make sense to just fetch the events and only mark them as published after the announcement? On the other hand, that might get us into other problems. Any thoughts on this would be much appreciated. --- src/lib/db/event-store.ts | 28 ++++++++++++------- src/lib/services/event-announcer-service.ts | 22 +++++++++++++++ src/lib/services/index.ts | 12 ++++++++ src/lib/types/services.ts | 2 ++ src/lib/types/stores/event-store.ts | 1 + .../20230706123907-events-announced-column.js | 22 +++++++++++++++ src/test/e2e/stores/event-store.e2e.test.ts | 5 ++-- src/test/fixtures/fake-event-store.ts | 4 +++ 8 files changed, 84 insertions(+), 12 deletions(-) create mode 100644 src/lib/services/event-announcer-service.ts create mode 100644 src/migrations/20230706123907-events-announced-column.js diff --git a/src/lib/db/event-store.ts b/src/lib/db/event-store.ts index 26511fd99d..8adabcd61e 100644 --- a/src/lib/db/event-store.ts +++ b/src/lib/db/event-store.ts @@ -98,13 +98,9 @@ class EventStore implements IEventStore { async store(event: IBaseEvent): Promise { try { - const rows = await this.db(TABLE) + await this.db(TABLE) .insert(this.eventToDbRow(event)) .returning(EVENT_COLUMNS); - const savedEvent = this.rowToEvent(rows[0]); - process.nextTick(() => - this.eventEmitter.emit(event.type, savedEvent), - ); } catch (error: unknown) { this.logger.warn(`Failed to store "${event.type}" event: ${error}`); } @@ -148,13 +144,9 @@ class EventStore implements IEventStore { async batchStore(events: IBaseEvent[]): Promise { try { - const savedRows = await this.db(TABLE) + await this.db(TABLE) .insert(events.map(this.eventToDbRow)) .returning(EVENT_COLUMNS); - const savedEvents = savedRows.map(this.rowToEvent); - process.nextTick(() => - savedEvents.forEach((e) => this.eventEmitter.emit(e.type, e)), - ); } catch (error: unknown) { this.logger.warn(`Failed to store events: ${error}`); } @@ -411,6 +403,22 @@ class EventStore implements IEventStore { ): EventEmitter { return this.eventEmitter.off(eventName, listener); } + + private async setUnannouncedToAnnounced(): Promise { + const rows = await this.db(TABLE) + .update({ announced: true }) + .where('announced', false) + .whereNotNull('announced') + .returning(EVENT_COLUMNS); + + return rows.map(this.rowToEvent); + } + + async publishUnannouncedEvents(): Promise { + const events = await this.setUnannouncedToAnnounced(); + + events.forEach((e) => this.eventEmitter.emit(e.type, e)); + } } export default EventStore; diff --git a/src/lib/services/event-announcer-service.ts b/src/lib/services/event-announcer-service.ts new file mode 100644 index 0000000000..6cc5c5154d --- /dev/null +++ b/src/lib/services/event-announcer-service.ts @@ -0,0 +1,22 @@ +import { IUnleashConfig } from '../types/option'; +import { IUnleashStores } from '../types/stores'; +import { Logger } from '../logger'; +import { IEventStore } from '../types/stores/event-store'; + +export default class EventAnnouncer { + private logger: Logger; + + private eventStore: IEventStore; + + constructor( + { eventStore }: Pick, + { getLogger }: Pick, + ) { + this.logger = getLogger('services/event-service.ts'); + this.eventStore = eventStore; + } + + async publishUnannouncedEvents(): Promise { + return this.eventStore.publishUnannouncedEvents(); + } +} diff --git a/src/lib/services/index.ts b/src/lib/services/index.ts index 075747b166..57a1093ef3 100644 --- a/src/lib/services/index.ts +++ b/src/lib/services/index.ts @@ -58,6 +58,7 @@ import { } from '../features/change-request-access-service/createChangeRequestAccessReadModel'; import ConfigurationRevisionService from '../features/feature-toggle/configuration-revision-service'; import { createFeatureToggleService } from '../features'; +import EventAnnouncerService from './event-announcer-service'; // TODO: will be moved to scheduler feature directory export const scheduleServices = async ( @@ -72,6 +73,7 @@ export const scheduleServices = async ( projectHealthService, configurationRevisionService, maintenanceService, + eventAnnouncerService, } = services; if (await maintenanceService.isMaintenanceMode()) { @@ -116,6 +118,13 @@ export const scheduleServices = async ( ), secondsToMilliseconds(1), ); + + schedulerService.schedule( + eventAnnouncerService.publishUnannouncedEvents.bind( + eventAnnouncerService, + ), + secondsToMilliseconds(1), + ); }; export const createServices = ( @@ -240,10 +249,13 @@ export const createServices = ( schedulerService, ); + const eventAnnouncerService = new EventAnnouncerService(stores, config); + return { accessService, accountService, addonService, + eventAnnouncerService, featureToggleService: featureToggleServiceV2, featureToggleServiceV2, featureTypeService, diff --git a/src/lib/types/services.ts b/src/lib/types/services.ts index 9c9597bd72..36e85b3902 100644 --- a/src/lib/types/services.ts +++ b/src/lib/types/services.ts @@ -42,6 +42,7 @@ import { Knex } from 'knex'; import ExportImportService from '../features/export-import-toggles/export-import-service'; import { ISegmentService } from '../segments/segment-service-interface'; import ConfigurationRevisionService from '../features/feature-toggle/configuration-revision-service'; +import EventAnnouncerService from 'lib/services/event-announcer-service'; export interface IUnleashServices { accessService: AccessService; @@ -88,6 +89,7 @@ export interface IUnleashServices { exportImportService: ExportImportService; configurationRevisionService: ConfigurationRevisionService; schedulerService: SchedulerService; + eventAnnouncerService: EventAnnouncerService; transactionalExportImportService: ( db: Knex.Transaction, ) => ExportImportService; diff --git a/src/lib/types/stores/event-store.ts b/src/lib/types/stores/event-store.ts index ac53c6b887..307a6695df 100644 --- a/src/lib/types/stores/event-store.ts +++ b/src/lib/types/stores/event-store.ts @@ -7,6 +7,7 @@ import { IQueryOperations } from 'lib/db/event-store'; export interface IEventStore extends Store, Pick { + publishUnannouncedEvents(): Promise; store(event: IBaseEvent): Promise; batchStore(events: IBaseEvent[]): Promise; getEvents(): Promise; diff --git a/src/migrations/20230706123907-events-announced-column.js b/src/migrations/20230706123907-events-announced-column.js new file mode 100644 index 0000000000..f5cfb52370 --- /dev/null +++ b/src/migrations/20230706123907-events-announced-column.js @@ -0,0 +1,22 @@ +'use strict'; + +exports.up = function (db, cb) { + // mark existing events as announced, set the default to false for future events. + db.runSql( + ` + ALTER TABLE events + ADD COLUMN IF NOT EXISTS "announced" BOOLEAN DEFAULT TRUE, + ALTER COLUMN "announced" SET DEFAULT FALSE; + `, + cb(), + ); +}; + +exports.down = function (db, cb) { + db.runSql( + ` + ALTER TABLE events DROP COLUMN IF EXISTS "announced"; + `, + cb, + ); +}; diff --git a/src/test/e2e/stores/event-store.e2e.test.ts b/src/test/e2e/stores/event-store.e2e.test.ts index 72e14dce41..6c56769f43 100644 --- a/src/test/e2e/stores/event-store.e2e.test.ts +++ b/src/test/e2e/stores/event-store.e2e.test.ts @@ -43,7 +43,7 @@ test('Should include id and createdAt when saving', async () => { const seen: Array = []; eventStore.on(APPLICATION_CREATED, (e) => seen.push(e)); await eventStore.store(event1); - jest.advanceTimersByTime(100); + await eventStore.publishUnannouncedEvents(); expect(seen).toHaveLength(1); expect(seen[0].id).toBeTruthy(); expect(seen[0].createdAt).toBeTruthy(); @@ -74,6 +74,7 @@ test('Should include empty tags array for new event', async () => { // Trigger await eventStore.store(event); + await eventStore.publishUnannouncedEvents(); return promise; }); @@ -108,7 +109,7 @@ test('Should be able to store multiple events at once', async () => { const seen = []; eventStore.on(APPLICATION_CREATED, (e) => seen.push(e)); await eventStore.batchStore([event1, event2, event3]); - jest.advanceTimersByTime(100); + await eventStore.publishUnannouncedEvents(); expect(seen.length).toBe(3); seen.forEach((e) => { expect(e.id).toBeTruthy(); diff --git a/src/test/fixtures/fake-event-store.ts b/src/test/fixtures/fake-event-store.ts index d5dc3b0f39..7dcfc558af 100644 --- a/src/test/fixtures/fake-event-store.ts +++ b/src/test/fixtures/fake-event-store.ts @@ -121,6 +121,10 @@ class FakeEventStore implements IEventStore { ): EventEmitter { return this.eventEmitter.off(eventName, listener); } + + publishUnannouncedEvents(): Promise { + throw new Error('Method not implemented.'); + } } module.exports = FakeEventStore;