mirror of
https://github.com/Unleash/unleash.git
synced 2025-03-04 00:18:40 +01:00
## About the changes In our staging setup, we create ad-hoc environments and import Unleash state from production. After unleash is deployed on such environment, the import job kicks in and feeds Unleash instance with feature flags from production. Between Unleash being up and running and the import job running, some applications start polling Unleash. They get an empty feature toggle list with `meta.revisionId=0`. Then apps use this as part of `eTag` header in subsequent requests. Even though after import Unleash server finally has toggles to serve, it doesn't because it calculates _max revision id_ based on toggle updates (not null `feature_name` column in query) or `SEGMENT_UPDATED`. This change adds an extra condition to query so feature toggles import is considered something that should invalidate the cache.
432 lines
12 KiB
TypeScript
432 lines
12 KiB
TypeScript
import {
|
|
IEvent,
|
|
IBaseEvent,
|
|
SEGMENT_UPDATED,
|
|
FEATURE_IMPORT,
|
|
FEATURES_IMPORTED,
|
|
IEventType,
|
|
} from '../types/events';
|
|
import { LogProvider, Logger } from '../logger';
|
|
import { IEventStore } from '../types/stores/event-store';
|
|
import { ITag } from '../types/model';
|
|
import { SearchEventsSchema } from '../openapi/spec/search-events-schema';
|
|
import { sharedEventEmitter } from '../util/anyEventEmitter';
|
|
import { Db } from './db';
|
|
import { Knex } from 'knex';
|
|
import EventEmitter from 'events';
|
|
|
|
const EVENT_COLUMNS = [
|
|
'id',
|
|
'type',
|
|
'created_by',
|
|
'created_at',
|
|
'data',
|
|
'pre_data',
|
|
'tags',
|
|
'feature_name',
|
|
'project',
|
|
'environment',
|
|
] as const;
|
|
|
|
export type IQueryOperations =
|
|
| IWhereOperation
|
|
| IBeforeDateOperation
|
|
| IBetweenDatesOperation
|
|
| IForFeaturesOperation;
|
|
|
|
interface IWhereOperation {
|
|
op: 'where';
|
|
parameters: {
|
|
[key: string]: string;
|
|
};
|
|
}
|
|
|
|
interface IBeforeDateOperation {
|
|
op: 'beforeDate';
|
|
parameters: {
|
|
dateAccessor: string;
|
|
date: string;
|
|
};
|
|
}
|
|
|
|
interface IBetweenDatesOperation {
|
|
op: 'betweenDate';
|
|
parameters: {
|
|
dateAccessor: string;
|
|
range: string[];
|
|
};
|
|
}
|
|
|
|
interface IForFeaturesOperation {
|
|
op: 'forFeatures';
|
|
parameters: IForFeaturesParams;
|
|
}
|
|
|
|
interface IForFeaturesParams {
|
|
type: string;
|
|
projectId: string;
|
|
environments: string[];
|
|
features: string[];
|
|
}
|
|
|
|
export interface IEventTable {
|
|
id: number;
|
|
type: string;
|
|
created_by: string;
|
|
created_at: Date;
|
|
data?: any;
|
|
pre_data?: any;
|
|
feature_name?: string;
|
|
project?: string;
|
|
environment?: string;
|
|
tags: ITag[];
|
|
}
|
|
|
|
const TABLE = 'events';
|
|
|
|
class EventStore implements IEventStore {
|
|
private db: Db;
|
|
|
|
// only one shared event emitter should exist across all event store instances
|
|
private eventEmitter: EventEmitter = sharedEventEmitter;
|
|
|
|
private logger: Logger;
|
|
|
|
// a new DB has to be injected per transaction
|
|
constructor(db: Db, getLogger: LogProvider) {
|
|
this.db = db;
|
|
this.logger = getLogger('lib/db/event-store.ts');
|
|
}
|
|
|
|
async store(event: IBaseEvent): Promise<void> {
|
|
try {
|
|
await this.db(TABLE)
|
|
.insert(this.eventToDbRow(event))
|
|
.returning(EVENT_COLUMNS);
|
|
} catch (error: unknown) {
|
|
this.logger.warn(`Failed to store "${event.type}" event: ${error}`);
|
|
}
|
|
}
|
|
|
|
async count(): Promise<number> {
|
|
const count = await this.db(TABLE)
|
|
.count<Record<string, number>>()
|
|
.first();
|
|
if (!count) {
|
|
return 0;
|
|
}
|
|
if (typeof count.count === 'string') {
|
|
return parseInt(count.count, 10);
|
|
} else {
|
|
return count.count;
|
|
}
|
|
}
|
|
|
|
async filteredCount(eventSearch: SearchEventsSchema): Promise<number> {
|
|
let query = this.db(TABLE);
|
|
if (eventSearch.type) {
|
|
query = query.andWhere({ type: eventSearch.type });
|
|
}
|
|
if (eventSearch.project) {
|
|
query = query.andWhere({ project: eventSearch.project });
|
|
}
|
|
if (eventSearch.feature) {
|
|
query = query.andWhere({ feature_name: eventSearch.feature });
|
|
}
|
|
const count = await query.count().first();
|
|
if (!count) {
|
|
return 0;
|
|
}
|
|
if (typeof count.count === 'string') {
|
|
return parseInt(count.count, 10);
|
|
} else {
|
|
return count.count;
|
|
}
|
|
}
|
|
|
|
async batchStore(events: IBaseEvent[]): Promise<void> {
|
|
try {
|
|
await this.db(TABLE)
|
|
.insert(events.map(this.eventToDbRow))
|
|
.returning(EVENT_COLUMNS);
|
|
} catch (error: unknown) {
|
|
this.logger.warn(`Failed to store events: ${error}`);
|
|
}
|
|
}
|
|
|
|
async getMaxRevisionId(largerThan: number = 0): Promise<number> {
|
|
const row = await this.db(TABLE)
|
|
.max('id')
|
|
.where((builder) =>
|
|
builder
|
|
.whereNotNull('feature_name')
|
|
.orWhereIn('type', [
|
|
SEGMENT_UPDATED,
|
|
FEATURE_IMPORT,
|
|
FEATURES_IMPORTED,
|
|
]),
|
|
)
|
|
.andWhere('id', '>=', largerThan)
|
|
.first();
|
|
return row?.max ?? 0;
|
|
}
|
|
|
|
async delete(key: number): Promise<void> {
|
|
await this.db(TABLE).where({ id: key }).del();
|
|
}
|
|
|
|
async deleteAll(): Promise<void> {
|
|
await this.db(TABLE).del();
|
|
}
|
|
|
|
destroy(): void {}
|
|
|
|
async exists(key: number): Promise<boolean> {
|
|
const result = await this.db.raw(
|
|
`SELECT EXISTS (SELECT 1 FROM ${TABLE} WHERE id = ?) AS present`,
|
|
[key],
|
|
);
|
|
const { present } = result.rows[0];
|
|
return present;
|
|
}
|
|
|
|
async query(operations: IQueryOperations[]): Promise<IEvent[]> {
|
|
try {
|
|
let query: Knex.QueryBuilder = this.select();
|
|
|
|
operations.forEach((operation) => {
|
|
if (operation.op === 'where') {
|
|
query = this.where(query, operation.parameters);
|
|
}
|
|
|
|
if (operation.op === 'forFeatures') {
|
|
query = this.forFeatures(query, operation.parameters);
|
|
}
|
|
|
|
if (operation.op === 'beforeDate') {
|
|
query = this.beforeDate(query, operation.parameters);
|
|
}
|
|
|
|
if (operation.op === 'betweenDate') {
|
|
query = this.betweenDate(query, operation.parameters);
|
|
}
|
|
});
|
|
|
|
const rows = await query;
|
|
return rows.map(this.rowToEvent);
|
|
} catch (e) {
|
|
return [];
|
|
}
|
|
}
|
|
|
|
async queryCount(operations: IQueryOperations[]): Promise<number> {
|
|
try {
|
|
let query: Knex.QueryBuilder = this.db.count().from(TABLE);
|
|
|
|
operations.forEach((operation) => {
|
|
if (operation.op === 'where') {
|
|
query = this.where(query, operation.parameters);
|
|
}
|
|
|
|
if (operation.op === 'forFeatures') {
|
|
query = this.forFeatures(query, operation.parameters);
|
|
}
|
|
|
|
if (operation.op === 'beforeDate') {
|
|
query = this.beforeDate(query, operation.parameters);
|
|
}
|
|
|
|
if (operation.op === 'betweenDate') {
|
|
query = this.betweenDate(query, operation.parameters);
|
|
}
|
|
});
|
|
|
|
const queryResult = await query.first();
|
|
return parseInt(queryResult.count || 0);
|
|
} catch (e) {
|
|
return 0;
|
|
}
|
|
}
|
|
|
|
where(
|
|
query: Knex.QueryBuilder,
|
|
parameters: { [key: string]: string },
|
|
): Knex.QueryBuilder {
|
|
return query.where(parameters);
|
|
}
|
|
|
|
beforeDate(
|
|
query: Knex.QueryBuilder,
|
|
parameters: { dateAccessor: string; date: string },
|
|
): Knex.QueryBuilder {
|
|
return query.andWhere(parameters.dateAccessor, '>=', parameters.date);
|
|
}
|
|
|
|
betweenDate(
|
|
query: Knex.QueryBuilder,
|
|
parameters: { dateAccessor: string; range: string[] },
|
|
): Knex.QueryBuilder {
|
|
if (parameters.range && parameters.range.length === 2) {
|
|
return query.andWhereBetween(parameters.dateAccessor, [
|
|
parameters.range[0],
|
|
parameters.range[1],
|
|
]);
|
|
}
|
|
|
|
return query;
|
|
}
|
|
|
|
select(): Knex.QueryBuilder {
|
|
return this.db.select(EVENT_COLUMNS).from(TABLE);
|
|
}
|
|
|
|
forFeatures(
|
|
query: Knex.QueryBuilder,
|
|
parameters: IForFeaturesParams,
|
|
): Knex.QueryBuilder {
|
|
return query
|
|
.where({ type: parameters.type, project: parameters.projectId })
|
|
.whereIn('feature_name', parameters.features)
|
|
.whereIn('environment', parameters.environments);
|
|
}
|
|
|
|
async get(key: number): Promise<IEvent> {
|
|
const row = await this.db(TABLE).where({ id: key }).first();
|
|
return this.rowToEvent(row);
|
|
}
|
|
|
|
async getAll(query?: Object): Promise<IEvent[]> {
|
|
return this.getEvents(query);
|
|
}
|
|
|
|
async getEvents(query?: Object): Promise<IEvent[]> {
|
|
try {
|
|
let qB = this.db
|
|
.select(EVENT_COLUMNS)
|
|
.from(TABLE)
|
|
.limit(100)
|
|
.orderBy('created_at', 'desc');
|
|
if (query) {
|
|
qB = qB.where(query);
|
|
}
|
|
const rows = await qB;
|
|
return rows.map(this.rowToEvent);
|
|
} catch (err) {
|
|
return [];
|
|
}
|
|
}
|
|
|
|
async searchEvents(search: SearchEventsSchema = {}): Promise<IEvent[]> {
|
|
let query = this.db
|
|
.select(EVENT_COLUMNS)
|
|
.from<IEventTable>(TABLE)
|
|
.limit(search.limit ?? 100)
|
|
.offset(search.offset ?? 0)
|
|
.orderBy('created_at', 'desc');
|
|
|
|
if (search.type) {
|
|
query = query.andWhere({
|
|
type: search.type,
|
|
});
|
|
}
|
|
|
|
if (search.project) {
|
|
query = query.andWhere({
|
|
project: search.project,
|
|
});
|
|
}
|
|
|
|
if (search.feature) {
|
|
query = query.andWhere({
|
|
feature_name: search.feature,
|
|
});
|
|
}
|
|
|
|
if (search.query) {
|
|
query = query.where((where) =>
|
|
where
|
|
.orWhereRaw('type::text ILIKE ?', `%${search.query}%`)
|
|
.orWhereRaw('created_by::text ILIKE ?', `%${search.query}%`)
|
|
.orWhereRaw('data::text ILIKE ?', `%${search.query}%`)
|
|
.orWhereRaw('tags::text ILIKE ?', `%${search.query}%`)
|
|
.orWhereRaw('pre_data::text ILIKE ?', `%${search.query}%`),
|
|
);
|
|
}
|
|
|
|
try {
|
|
return (await query).map(this.rowToEvent);
|
|
} catch (err) {
|
|
return [];
|
|
}
|
|
}
|
|
|
|
rowToEvent(row: IEventTable): IEvent {
|
|
return {
|
|
id: row.id,
|
|
type: row.type as IEventType,
|
|
createdBy: row.created_by,
|
|
createdAt: row.created_at,
|
|
data: row.data,
|
|
preData: row.pre_data,
|
|
tags: row.tags || [],
|
|
featureName: row.feature_name,
|
|
project: row.project,
|
|
environment: row.environment,
|
|
};
|
|
}
|
|
|
|
eventToDbRow(e: IBaseEvent): Omit<IEventTable, 'id' | 'created_at'> {
|
|
return {
|
|
type: e.type,
|
|
created_by: e.createdBy ?? 'admin',
|
|
data: Array.isArray(e.data) ? JSON.stringify(e.data) : e.data,
|
|
pre_data: Array.isArray(e.preData)
|
|
? JSON.stringify(e.preData)
|
|
: e.preData,
|
|
// @ts-expect-error workaround for json-array
|
|
tags: JSON.stringify(e.tags),
|
|
feature_name: e.featureName,
|
|
project: e.project,
|
|
environment: e.environment,
|
|
};
|
|
}
|
|
|
|
setMaxListeners(number: number): EventEmitter {
|
|
return this.eventEmitter.setMaxListeners(number);
|
|
}
|
|
|
|
on(
|
|
eventName: string | symbol,
|
|
listener: (...args: any[]) => void,
|
|
): EventEmitter {
|
|
return this.eventEmitter.on(eventName, listener);
|
|
}
|
|
|
|
emit(eventName: string | symbol, ...args: any[]): boolean {
|
|
return this.eventEmitter.emit(eventName, ...args);
|
|
}
|
|
|
|
off(
|
|
eventName: string | symbol,
|
|
listener: (...args: any[]) => void,
|
|
): EventEmitter {
|
|
return this.eventEmitter.off(eventName, listener);
|
|
}
|
|
|
|
async setUnannouncedToAnnounced(): Promise<IEvent[]> {
|
|
const rows = await this.db(TABLE)
|
|
.update({ announced: true })
|
|
.where('announced', false)
|
|
.returning(EVENT_COLUMNS);
|
|
return rows.map(this.rowToEvent);
|
|
}
|
|
|
|
async publishUnannouncedEvents(): Promise<void> {
|
|
const events = await this.setUnannouncedToAnnounced();
|
|
|
|
events.forEach((e) => this.eventEmitter.emit(e.type, e));
|
|
}
|
|
}
|
|
|
|
export default EventStore;
|