mirror of
https://github.com/Unleash/unleash.git
synced 2025-01-20 00:08:02 +01:00
chore: events created by userid migration (#6027)
## About the changes Schedules a best-effort task setting the value of events.created_by_user_id based on what is found in the created_by column and if it's capable of resolving that to a userid/a system id. The process is executed in the events-store, it takes a chunk of events that haven't been processed yet, attempts to join users and api_tokens tables on created_by = username/email, loops through and tries to figure out an id to set. Then updates the record. --------- Co-authored-by: Gastón Fournier <gaston@getunleash.io>
This commit is contained in:
parent
6f4f8661c7
commit
9d2c65c9c0
@ -45,8 +45,8 @@ export const createStores = (
|
|||||||
config: IUnleashConfig,
|
config: IUnleashConfig,
|
||||||
db: Db,
|
db: Db,
|
||||||
): IUnleashStores => {
|
): IUnleashStores => {
|
||||||
const { getLogger, eventBus } = config;
|
const { getLogger, eventBus, flagResolver } = config;
|
||||||
const eventStore = new EventStore(db, getLogger);
|
const eventStore = new EventStore(db, getLogger, flagResolver);
|
||||||
|
|
||||||
return {
|
return {
|
||||||
eventStore,
|
eventStore,
|
||||||
|
@ -10,7 +10,11 @@ export const createEventsService: (
|
|||||||
db: Db,
|
db: Db,
|
||||||
config: IUnleashConfig,
|
config: IUnleashConfig,
|
||||||
) => EventService = (db, config) => {
|
) => EventService = (db, config) => {
|
||||||
const eventStore = new EventStore(db, config.getLogger);
|
const eventStore = new EventStore(
|
||||||
|
db,
|
||||||
|
config.getLogger,
|
||||||
|
config.flagResolver,
|
||||||
|
);
|
||||||
const featureTagStore = new FeatureTagStore(
|
const featureTagStore = new FeatureTagStore(
|
||||||
db,
|
db,
|
||||||
config.eventBus,
|
config.eventBus,
|
||||||
|
123
src/lib/features/events/event-created-by-migration.test.ts
Normal file
123
src/lib/features/events/event-created-by-migration.test.ts
Normal file
@ -0,0 +1,123 @@
|
|||||||
|
import EventStore from './event-store';
|
||||||
|
import getLogger from '../../../test/fixtures/no-logger';
|
||||||
|
import dbInit, { ITestDb } from '../../../test/e2e/helpers/database-init';
|
||||||
|
import { defaultExperimentalOptions } from '../../types/experimental';
|
||||||
|
import FlagResolver from '../../util/flag-resolver';
|
||||||
|
|
||||||
|
let db: ITestDb;
|
||||||
|
let resolver: FlagResolver;
|
||||||
|
|
||||||
|
beforeAll(async () => {
|
||||||
|
resolver = new FlagResolver({
|
||||||
|
...defaultExperimentalOptions,
|
||||||
|
flags: { createdByUserIdDataMigration: true },
|
||||||
|
});
|
||||||
|
db = await dbInit('events_test', getLogger);
|
||||||
|
});
|
||||||
|
|
||||||
|
afterAll(async () => {
|
||||||
|
await db.rawDatabase('events').del();
|
||||||
|
await db.rawDatabase('users').del();
|
||||||
|
await db.destroy();
|
||||||
|
});
|
||||||
|
|
||||||
|
test('sets created_by_user_id on events with user username/email set as created_by', async () => {
|
||||||
|
const store = new EventStore(db.rawDatabase, getLogger, resolver);
|
||||||
|
|
||||||
|
await db.rawDatabase('users').insert({ username: 'test1' });
|
||||||
|
await db.rawDatabase('events').insert({
|
||||||
|
type: 'feature-created',
|
||||||
|
created_by: 'test1',
|
||||||
|
feature_name: `feature1`,
|
||||||
|
data: `{"test": "data-migrate"}`,
|
||||||
|
});
|
||||||
|
|
||||||
|
await store.setCreatedByUserId(200);
|
||||||
|
|
||||||
|
const user = await db
|
||||||
|
.rawDatabase('users')
|
||||||
|
.where({ username: 'test1' })
|
||||||
|
.first('id');
|
||||||
|
|
||||||
|
const events = await db.rawDatabase('events').select('*');
|
||||||
|
const notSet = events.filter(
|
||||||
|
(e) => !e.created_by_user_id && e.data.test === 'data-migrate',
|
||||||
|
);
|
||||||
|
const test1 = events.filter(
|
||||||
|
(e) =>
|
||||||
|
e.created_by_user_id === user.id && e.data.test === 'data-migrate',
|
||||||
|
);
|
||||||
|
expect(notSet).toHaveLength(0);
|
||||||
|
expect(test1).toHaveLength(1);
|
||||||
|
});
|
||||||
|
|
||||||
|
test('sets created_by_user_id on a mix of events and created_bys', async () => {
|
||||||
|
const store = new EventStore(db.rawDatabase, getLogger, resolver);
|
||||||
|
|
||||||
|
await db.rawDatabase('users').insert({ username: 'test2' });
|
||||||
|
|
||||||
|
await db.rawDatabase('api_tokens').insert({
|
||||||
|
secret: 'token1',
|
||||||
|
username: 'adm-token',
|
||||||
|
type: 'admin',
|
||||||
|
environment: 'default',
|
||||||
|
token_name: 'admin-token',
|
||||||
|
});
|
||||||
|
|
||||||
|
await db.rawDatabase('events').insert({
|
||||||
|
type: 'feature-created',
|
||||||
|
created_by: 'test2',
|
||||||
|
feature_name: `feature1`,
|
||||||
|
data: `{"test": "data-migrate"}`,
|
||||||
|
});
|
||||||
|
|
||||||
|
await db.rawDatabase('events').insert({
|
||||||
|
type: 'strategy-created',
|
||||||
|
created_by: 'migration',
|
||||||
|
data: `{"test": "data-migrate"}`,
|
||||||
|
});
|
||||||
|
|
||||||
|
await db.rawDatabase('events').insert({
|
||||||
|
type: 'api-token-created',
|
||||||
|
created_by: 'init-api-tokens',
|
||||||
|
data: `{"test": "data-migrate"}`,
|
||||||
|
});
|
||||||
|
|
||||||
|
await db.rawDatabase('events').insert({
|
||||||
|
type: 'application-created',
|
||||||
|
created_by: '::1',
|
||||||
|
data: `{"test": "data-migrate"}`,
|
||||||
|
});
|
||||||
|
|
||||||
|
await db.rawDatabase('events').insert({
|
||||||
|
type: 'feature-created',
|
||||||
|
created_by: 'unknown',
|
||||||
|
feature_name: `feature2`,
|
||||||
|
data: `{"test": "data-migrate"}`,
|
||||||
|
});
|
||||||
|
|
||||||
|
await db.rawDatabase('events').insert({
|
||||||
|
type: 'feature-created',
|
||||||
|
created_by: 'adm-token',
|
||||||
|
feature_name: `feature3`,
|
||||||
|
data: `{"test": "data-migrate"}`,
|
||||||
|
});
|
||||||
|
|
||||||
|
await store.setCreatedByUserId(200);
|
||||||
|
|
||||||
|
const user = await db
|
||||||
|
.rawDatabase('users')
|
||||||
|
.where({ username: 'test2' })
|
||||||
|
.first('id');
|
||||||
|
|
||||||
|
const events = await db.rawDatabase('events').select('*');
|
||||||
|
const notSet = events.filter(
|
||||||
|
(e) => !e.created_by_user_id && e.data.test === 'data-migrate',
|
||||||
|
);
|
||||||
|
const test = events.filter(
|
||||||
|
(e) =>
|
||||||
|
e.created_by_user_id === user.id && e.data.test === 'data-migrate',
|
||||||
|
);
|
||||||
|
expect(notSet).toHaveLength(1);
|
||||||
|
expect(test).toHaveLength(1);
|
||||||
|
});
|
@ -134,4 +134,8 @@ export default class EventService {
|
|||||||
}
|
}
|
||||||
return this.eventStore.batchStore(enhancedEvents);
|
return this.eventStore.batchStore(enhancedEvents);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
async setEventCreatedByUserId(): Promise<void> {
|
||||||
|
return this.eventStore.setCreatedByUserId(100);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
@ -3,8 +3,13 @@ import EventStore from './event-store';
|
|||||||
import getLogger from '../../../test/fixtures/no-logger';
|
import getLogger from '../../../test/fixtures/no-logger';
|
||||||
import { subHours, formatRFC3339 } from 'date-fns';
|
import { subHours, formatRFC3339 } from 'date-fns';
|
||||||
import dbInit from '../../../test/e2e/helpers/database-init';
|
import dbInit from '../../../test/e2e/helpers/database-init';
|
||||||
|
import { defaultExperimentalOptions } from '../../types/experimental';
|
||||||
|
import FlagResolver from '../../util/flag-resolver';
|
||||||
|
|
||||||
|
let resolver: FlagResolver;
|
||||||
|
|
||||||
beforeAll(() => {
|
beforeAll(() => {
|
||||||
|
resolver = new FlagResolver(defaultExperimentalOptions);
|
||||||
getLogger.setMuteError(true);
|
getLogger.setMuteError(true);
|
||||||
});
|
});
|
||||||
|
|
||||||
@ -16,7 +21,7 @@ test('Trying to get events if db fails should yield empty list', async () => {
|
|||||||
const db = knex({
|
const db = knex({
|
||||||
client: 'pg',
|
client: 'pg',
|
||||||
});
|
});
|
||||||
const store = new EventStore(db, getLogger);
|
const store = new EventStore(db, getLogger, resolver);
|
||||||
const events = await store.getEvents();
|
const events = await store.getEvents();
|
||||||
expect(events.length).toBe(0);
|
expect(events.length).toBe(0);
|
||||||
await db.destroy();
|
await db.destroy();
|
||||||
@ -26,7 +31,7 @@ test('Trying to get events by name if db fails should yield empty list', async (
|
|||||||
const db = knex({
|
const db = knex({
|
||||||
client: 'pg',
|
client: 'pg',
|
||||||
});
|
});
|
||||||
const store = new EventStore(db, getLogger);
|
const store = new EventStore(db, getLogger, resolver);
|
||||||
const events = await store.searchEvents({ type: 'application-created' });
|
const events = await store.searchEvents({ type: 'application-created' });
|
||||||
expect(events).toBeTruthy();
|
expect(events).toBeTruthy();
|
||||||
expect(events.length).toBe(0);
|
expect(events.length).toBe(0);
|
||||||
@ -46,7 +51,7 @@ test('Find unannounced events returns all events', async () => {
|
|||||||
}));
|
}));
|
||||||
await db.rawDatabase('events').insert(allEvents).returning(['id']);
|
await db.rawDatabase('events').insert(allEvents).returning(['id']);
|
||||||
|
|
||||||
const store = new EventStore(db.rawDatabase, getLogger);
|
const store = new EventStore(db.rawDatabase, getLogger, resolver);
|
||||||
const events = await store.setUnannouncedToAnnounced();
|
const events = await store.setUnannouncedToAnnounced();
|
||||||
expect(events).toBeTruthy();
|
expect(events).toBeTruthy();
|
||||||
expect(events.length).toBe(505);
|
expect(events.length).toBe(505);
|
||||||
|
@ -14,6 +14,7 @@ import { sharedEventEmitter } from '../../util/anyEventEmitter';
|
|||||||
import { Db } from '../../db/db';
|
import { Db } from '../../db/db';
|
||||||
import { Knex } from 'knex';
|
import { Knex } from 'knex';
|
||||||
import EventEmitter from 'events';
|
import EventEmitter from 'events';
|
||||||
|
import { ADMIN_TOKEN_USER, IFlagResolver, SYSTEM_USER_ID } from '../../types';
|
||||||
|
|
||||||
const EVENT_COLUMNS = [
|
const EVENT_COLUMNS = [
|
||||||
'id',
|
'id',
|
||||||
@ -92,12 +93,15 @@ class EventStore implements IEventStore {
|
|||||||
// only one shared event emitter should exist across all event store instances
|
// only one shared event emitter should exist across all event store instances
|
||||||
private eventEmitter: EventEmitter = sharedEventEmitter;
|
private eventEmitter: EventEmitter = sharedEventEmitter;
|
||||||
|
|
||||||
|
private flagResolver: IFlagResolver;
|
||||||
|
|
||||||
private logger: Logger;
|
private logger: Logger;
|
||||||
|
|
||||||
// a new DB has to be injected per transaction
|
// a new DB has to be injected per transaction
|
||||||
constructor(db: Db, getLogger: LogProvider) {
|
constructor(db: Db, getLogger: LogProvider, flagResolver: IFlagResolver) {
|
||||||
this.db = db;
|
this.db = db;
|
||||||
this.logger = getLogger('event-store');
|
this.logger = getLogger('event-store');
|
||||||
|
this.flagResolver = flagResolver;
|
||||||
}
|
}
|
||||||
|
|
||||||
async store(event: IBaseEvent): Promise<void> {
|
async store(event: IBaseEvent): Promise<void> {
|
||||||
@ -428,6 +432,59 @@ class EventStore implements IEventStore {
|
|||||||
|
|
||||||
events.forEach((e) => this.eventEmitter.emit(e.type, e));
|
events.forEach((e) => this.eventEmitter.emit(e.type, e));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
async setCreatedByUserId(batchSize: number): Promise<void> {
|
||||||
|
const API_TOKEN_TABLE = 'api_tokens';
|
||||||
|
|
||||||
|
if (!this.flagResolver.isEnabled('createdByUserIdDataMigration')) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
const toUpdate = await this.db(`${TABLE} as e`)
|
||||||
|
.joinRaw(
|
||||||
|
`LEFT OUTER JOIN users AS u ON e.created_by = u.username OR e.created_by = u.email`,
|
||||||
|
)
|
||||||
|
.joinRaw(
|
||||||
|
`LEFT OUTER JOIN ${API_TOKEN_TABLE} AS t on e.created_by = t.username`,
|
||||||
|
)
|
||||||
|
.whereRaw(
|
||||||
|
`e.created_by_user_id IS null AND
|
||||||
|
e.created_by IS NOT null AND
|
||||||
|
(u.id IS NOT null OR
|
||||||
|
t.username IS NOT null OR
|
||||||
|
e.created_by in ('unknown', 'migration', 'init-api-tokens')
|
||||||
|
)`,
|
||||||
|
)
|
||||||
|
.orderBy('e.created_at', 'desc')
|
||||||
|
.limit(batchSize)
|
||||||
|
.select(['e.*', 'u.id AS userid', 't.username']);
|
||||||
|
|
||||||
|
const updatePromises = toUpdate.map(async (row) => {
|
||||||
|
if (
|
||||||
|
row.created_by === 'unknown' ||
|
||||||
|
row.created_by === 'migration' ||
|
||||||
|
(row.created_by === 'init-api-tokens' &&
|
||||||
|
row.type === 'api-token-created')
|
||||||
|
) {
|
||||||
|
return this.db(TABLE)
|
||||||
|
.update({ created_by_user_id: SYSTEM_USER_ID })
|
||||||
|
.where({ id: row.id });
|
||||||
|
} else if (row.userid) {
|
||||||
|
return this.db(TABLE)
|
||||||
|
.update({ created_by_user_id: row.userid })
|
||||||
|
.where({ id: row.id });
|
||||||
|
} else if (row.username) {
|
||||||
|
return this.db(TABLE)
|
||||||
|
.update({ created_by_user_id: ADMIN_TOKEN_USER.id })
|
||||||
|
.where({ id: row.id });
|
||||||
|
} else {
|
||||||
|
this.logger.warn(`Could not find user for event ${row.id}`);
|
||||||
|
return Promise.resolve();
|
||||||
|
}
|
||||||
|
});
|
||||||
|
|
||||||
|
await Promise.all(updatePromises);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
export default EventStore;
|
export default EventStore;
|
||||||
|
@ -192,7 +192,7 @@ export const deferredExportImportTogglesService = (
|
|||||||
eventBus,
|
eventBus,
|
||||||
getLogger,
|
getLogger,
|
||||||
);
|
);
|
||||||
const eventStore = new EventStore(db, getLogger);
|
const eventStore = new EventStore(db, getLogger, flagResolver);
|
||||||
const accessService = createAccessService(db, config);
|
const accessService = createAccessService(db, config);
|
||||||
const featureToggleService = createFeatureToggleService(db, config);
|
const featureToggleService = createFeatureToggleService(db, config);
|
||||||
const privateProjectChecker = createPrivateProjectChecker(db, config);
|
const privateProjectChecker = createPrivateProjectChecker(db, config);
|
||||||
|
@ -77,7 +77,7 @@ export const createInstanceStatsService = (db: Db, config: IUnleashConfig) => {
|
|||||||
eventBus,
|
eventBus,
|
||||||
getLogger,
|
getLogger,
|
||||||
);
|
);
|
||||||
const eventStore = new EventStore(db, getLogger);
|
const eventStore = new EventStore(db, getLogger, flagResolver);
|
||||||
const apiTokenStore = new ApiTokenStore(db, eventBus, getLogger);
|
const apiTokenStore = new ApiTokenStore(db, eventBus, getLogger);
|
||||||
const clientMetricsStoreV2 = new ClientMetricsStoreV2(
|
const clientMetricsStoreV2 = new ClientMetricsStoreV2(
|
||||||
db,
|
db,
|
||||||
|
@ -45,7 +45,7 @@ export const createProjectService = (
|
|||||||
config: IUnleashConfig,
|
config: IUnleashConfig,
|
||||||
): ProjectService => {
|
): ProjectService => {
|
||||||
const { eventBus, getLogger, flagResolver } = config;
|
const { eventBus, getLogger, flagResolver } = config;
|
||||||
const eventStore = new EventStore(db, getLogger);
|
const eventStore = new EventStore(db, getLogger, flagResolver);
|
||||||
const projectStore = new ProjectStore(
|
const projectStore = new ProjectStore(
|
||||||
db,
|
db,
|
||||||
eventBus,
|
eventBus,
|
||||||
|
@ -25,6 +25,7 @@ export const scheduleServices = async (
|
|||||||
configurationRevisionService,
|
configurationRevisionService,
|
||||||
eventAnnouncerService,
|
eventAnnouncerService,
|
||||||
featureToggleService,
|
featureToggleService,
|
||||||
|
eventService,
|
||||||
versionService,
|
versionService,
|
||||||
lastSeenService,
|
lastSeenService,
|
||||||
proxyService,
|
proxyService,
|
||||||
@ -151,6 +152,11 @@ export const scheduleServices = async (
|
|||||||
'updateAccountLastSeen',
|
'updateAccountLastSeen',
|
||||||
);
|
);
|
||||||
|
|
||||||
|
schedulerService.schedule(
|
||||||
|
eventService.setEventCreatedByUserId.bind(eventService),
|
||||||
|
minutesToMilliseconds(2),
|
||||||
|
'setEventCreatedByUserId',
|
||||||
|
);
|
||||||
schedulerService.schedule(
|
schedulerService.schedule(
|
||||||
featureToggleService.setFeatureCreatedByUserIdFromEvents.bind(
|
featureToggleService.setFeatureCreatedByUserIdFromEvents.bind(
|
||||||
featureToggleService,
|
featureToggleService,
|
||||||
|
@ -17,4 +17,5 @@ export interface IEventStore
|
|||||||
getMaxRevisionId(currentMax?: number): Promise<number>;
|
getMaxRevisionId(currentMax?: number): Promise<number>;
|
||||||
query(operations: IQueryOperations[]): Promise<IEvent[]>;
|
query(operations: IQueryOperations[]): Promise<IEvent[]>;
|
||||||
queryCount(operations: IQueryOperations[]): Promise<number>;
|
queryCount(operations: IQueryOperations[]): Promise<number>;
|
||||||
|
setCreatedByUserId(batchSize: number): Promise<void>;
|
||||||
}
|
}
|
||||||
|
4
src/test/fixtures/fake-event-store.ts
vendored
4
src/test/fixtures/fake-event-store.ts
vendored
@ -125,6 +125,10 @@ class FakeEventStore implements IEventStore {
|
|||||||
publishUnannouncedEvents(): Promise<void> {
|
publishUnannouncedEvents(): Promise<void> {
|
||||||
throw new Error('Method not implemented.');
|
throw new Error('Method not implemented.');
|
||||||
}
|
}
|
||||||
|
|
||||||
|
setCreatedByUserId(batchSize: number): Promise<void> {
|
||||||
|
throw new Error('Method not implemented.');
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
module.exports = FakeEventStore;
|
module.exports = FakeEventStore;
|
||||||
|
Loading…
Reference in New Issue
Block a user