import { IEvent, IBaseEvent } from '../types/events'; import { LogProvider, Logger } from '../logger'; import { IEventStore } from '../types/stores/event-store'; import { ITag } from '../types/model'; import { SearchEventsSchema } from '../openapi/spec/search-events-schema'; import { sharedEventEmitter } from '../util/anyEventEmitter'; import { Db } from './db'; import { Knex } from 'knex'; import EventEmitter from 'events'; const EVENT_COLUMNS = [ 'id', 'type', 'created_by', 'created_at', 'data', 'pre_data', 'tags', 'feature_name', 'project', 'environment', ] as const; export type IQueryOperations = | IWhereOperation | IBeforeDateOperation | IBetweenDatesOperation | IForFeaturesOperation; interface IWhereOperation { op: 'where'; parameters: { [key: string]: string; }; } interface IBeforeDateOperation { op: 'beforeDate'; parameters: { dateAccessor: string; date: string; }; } interface IBetweenDatesOperation { op: 'betweenDate'; parameters: { dateAccessor: string; range: string[]; }; } interface IForFeaturesOperation { op: 'forFeatures'; parameters: IForFeaturesParams; } interface IForFeaturesParams { type: string; projectId: string; environments: string[]; features: string[]; } export interface IEventTable { id: number; type: string; created_by: string; created_at: Date; data?: any; pre_data?: any; feature_name?: string; project?: string; environment?: string; tags: ITag[]; } const TABLE = 'events'; class EventStore implements IEventStore { private db: Db; // only one shared event emitter should exist across all event store instances private eventEmitter: EventEmitter = sharedEventEmitter; private logger: Logger; // a new DB has to be injected per transaction constructor(db: Db, getLogger: LogProvider) { this.db = db; this.logger = getLogger('lib/db/event-store.ts'); } async store(event: IBaseEvent): Promise { try { const rows = await this.db(TABLE) .insert(this.eventToDbRow(event)) .returning(EVENT_COLUMNS); const savedEvent = this.rowToEvent(rows[0]); process.nextTick(() => this.eventEmitter.emit(event.type, savedEvent), ); } catch (error: unknown) { this.logger.warn(`Failed to store "${event.type}" event: ${error}`); } } async count(): Promise { let count = await this.db(TABLE) .count>() .first(); if (typeof count.count === 'string') { return parseInt(count.count, 10); } else { return count.count; } } async filteredCount(eventSearch: SearchEventsSchema): Promise { let query = this.db(TABLE); if (eventSearch.type) { query = query.andWhere({ type: eventSearch.type }); } if (eventSearch.project) { query = query.andWhere({ project: eventSearch.project }); } if (eventSearch.feature) { query = query.andWhere({ feature_name: eventSearch.feature }); } let count = await query.count().first(); if (typeof count.count === 'string') { return parseInt(count.count, 10); } else { return count.count; } } async batchStore(events: IBaseEvent[]): Promise { try { const savedRows = await this.db(TABLE) .insert(events.map(this.eventToDbRow)) .returning(EVENT_COLUMNS); const savedEvents = savedRows.map(this.rowToEvent); process.nextTick(() => savedEvents.forEach((e) => this.eventEmitter.emit(e.type, e)), ); } catch (error: unknown) { this.logger.warn(`Failed to store events: ${error}`); } } async delete(key: number): Promise { await this.db(TABLE).where({ id: key }).del(); } async deleteAll(): Promise { await this.db(TABLE).del(); } destroy(): void {} async exists(key: number): Promise { const result = await this.db.raw( `SELECT EXISTS (SELECT 1 FROM ${TABLE} WHERE id = ?) AS present`, [key], ); const { present } = result.rows[0]; return present; } async query(operations: IQueryOperations[]): Promise { try { let query: Knex.QueryBuilder = this.select(); operations.forEach((operation) => { if (operation.op === 'where') { query = this.where(query, operation.parameters); } if (operation.op === 'forFeatures') { query = this.forFeatures(query, operation.parameters); } if (operation.op === 'beforeDate') { query = this.beforeDate(query, operation.parameters); } if (operation.op === 'betweenDate') { query = this.betweenDate(query, operation.parameters); } }); const rows = await query; return rows.map(this.rowToEvent); } catch (e) { return []; } } where( query: Knex.QueryBuilder, parameters: { [key: string]: string }, ): Knex.QueryBuilder { return query.where(parameters); } beforeDate( query: Knex.QueryBuilder, parameters: { dateAccessor: string; date: string }, ): Knex.QueryBuilder { return query.andWhere(parameters.dateAccessor, '>=', parameters.date); } betweenDate( query: Knex.QueryBuilder, parameters: { dateAccessor: string; range: string[] }, ): Knex.QueryBuilder { if (parameters.range && parameters.range.length === 2) { return query.andWhereBetween(parameters.dateAccessor, [ parameters.range[0], parameters.range[1], ]); } return query; } select(): Knex.QueryBuilder { return this.db.select(EVENT_COLUMNS).from(TABLE); } forFeatures( query: Knex.QueryBuilder, parameters: IForFeaturesParams, ): Knex.QueryBuilder { return query .where({ type: parameters.type, project: parameters.projectId }) .whereIn('feature_name', parameters.features) .whereIn('environment', parameters.environments); } async get(key: number): Promise { const row = await this.db(TABLE).where({ id: key }).first(); return this.rowToEvent(row); } async getAll(query?: Object): Promise { return this.getEvents(query); } async getEvents(query?: Object): Promise { try { let qB = this.db .select(EVENT_COLUMNS) .from(TABLE) .limit(100) .orderBy('created_at', 'desc'); if (query) { qB = qB.where(query); } const rows = await qB; return rows.map(this.rowToEvent); } catch (err) { return []; } } async searchEvents(search: SearchEventsSchema = {}): Promise { let query = this.db .select(EVENT_COLUMNS) .from(TABLE) .limit(search.limit ?? 100) .offset(search.offset ?? 0) .orderBy('created_at', 'desc'); if (search.type) { query = query.andWhere({ type: search.type, }); } if (search.project) { query = query.andWhere({ project: search.project, }); } if (search.feature) { query = query.andWhere({ feature_name: search.feature, }); } if (search.query) { query = query.where((where) => where .orWhereRaw('type::text ILIKE ?', `%${search.query}%`) .orWhereRaw('created_by::text ILIKE ?', `%${search.query}%`) .orWhereRaw('data::text ILIKE ?', `%${search.query}%`) .orWhereRaw('pre_data::text ILIKE ?', `%${search.query}%`), ); } try { return (await query).map(this.rowToEvent); } catch (err) { return []; } } rowToEvent(row: IEventTable): IEvent { return { id: row.id, type: row.type, createdBy: row.created_by, createdAt: row.created_at, data: row.data, preData: row.pre_data, tags: row.tags || [], featureName: row.feature_name, project: row.project, environment: row.environment, }; } eventToDbRow(e: IBaseEvent): Omit { return { type: e.type, created_by: e.createdBy, data: e.data, pre_data: e.preData, // @ts-expect-error workaround for json-array tags: JSON.stringify(e.tags), feature_name: e.featureName, project: e.project, environment: e.environment, }; } setMaxListeners(number: number): EventEmitter { return this.eventEmitter.setMaxListeners(number); } on( eventName: string | symbol, listener: (...args: any[]) => void, ): EventEmitter { return this.eventEmitter.on(eventName, listener); } emit(eventName: string | symbol, ...args: any[]): boolean { return this.eventEmitter.emit(eventName, ...args); } off( eventName: string | symbol, listener: (...args: any[]) => void, ): EventEmitter { return this.eventEmitter.off(eventName, listener); } } export default EventStore;