mirror of
https://github.com/Unleash/unleash.git
synced 2025-07-31 13:47:02 +02:00
chore: prevent unknown flag deadlocks by sorting and batching inserts (#10348)
https://linear.app/unleash/issue/2-3692/prevent-deadlocks-by-sorting-and-batching-inserts Tries to prevent deadlocks by sorting and batching unknown flag inserts when flushing.
This commit is contained in:
parent
998834245c
commit
3d78fbea7f
@ -205,7 +205,7 @@ export const createStores = (
|
|||||||
releasePlanMilestoneStrategyStore:
|
releasePlanMilestoneStrategyStore:
|
||||||
new ReleasePlanMilestoneStrategyStore(db, config),
|
new ReleasePlanMilestoneStrategyStore(db, config),
|
||||||
featureLinkStore: new FeatureLinkStore(db, config),
|
featureLinkStore: new FeatureLinkStore(db, config),
|
||||||
unknownFlagsStore: new UnknownFlagsStore(db),
|
unknownFlagsStore: new UnknownFlagsStore(db, getLogger),
|
||||||
featureLinkReadModel: new FeatureLinksReadModel(db, eventBus),
|
featureLinkReadModel: new FeatureLinksReadModel(db, eventBus),
|
||||||
};
|
};
|
||||||
};
|
};
|
||||||
|
@ -46,6 +46,8 @@ export class UnknownFlagsService {
|
|||||||
|
|
||||||
const cached = Array.from(this.unknownFlagsCache.values());
|
const cached = Array.from(this.unknownFlagsCache.values());
|
||||||
|
|
||||||
|
cached.sort((a, b) => this.getKey(a).localeCompare(this.getKey(b)));
|
||||||
|
|
||||||
await this.unknownFlagsStore.insert(cached);
|
await this.unknownFlagsStore.insert(cached);
|
||||||
this.unknownFlagsCache.clear();
|
this.unknownFlagsCache.clear();
|
||||||
}
|
}
|
||||||
|
@ -1,6 +1,8 @@
|
|||||||
import type { Db } from '../../../db/db.js';
|
import type { Db } from '../../../db/db.js';
|
||||||
|
import type { Logger, LogProvider } from '../../../logger.js';
|
||||||
|
|
||||||
const TABLE = 'unknown_flags';
|
const TABLE = 'unknown_flags';
|
||||||
|
const MAX_INSERT_BATCH_SIZE = 100;
|
||||||
|
|
||||||
export type UnknownFlag = {
|
export type UnknownFlag = {
|
||||||
name: string;
|
name: string;
|
||||||
@ -28,22 +30,36 @@ export interface IUnknownFlagsStore {
|
|||||||
export class UnknownFlagsStore implements IUnknownFlagsStore {
|
export class UnknownFlagsStore implements IUnknownFlagsStore {
|
||||||
private db: Db;
|
private db: Db;
|
||||||
|
|
||||||
constructor(db: Db) {
|
private logger: Logger;
|
||||||
|
|
||||||
|
constructor(db: Db, getLogger: LogProvider) {
|
||||||
this.db = db;
|
this.db = db;
|
||||||
|
this.logger = getLogger('unknown-flags-store.ts');
|
||||||
}
|
}
|
||||||
|
|
||||||
async insert(flags: UnknownFlag[]): Promise<void> {
|
async insert(flags: UnknownFlag[]): Promise<void> {
|
||||||
if (flags.length > 0) {
|
if (!flags.length) return;
|
||||||
const rows = flags.map((flag) => ({
|
|
||||||
name: flag.name,
|
const rows = flags.map(({ name, appName, seenAt, environment }) => ({
|
||||||
app_name: flag.appName,
|
name,
|
||||||
seen_at: flag.seenAt,
|
app_name: appName,
|
||||||
environment: flag.environment,
|
seen_at: seenAt,
|
||||||
}));
|
environment,
|
||||||
await this.db(TABLE)
|
}));
|
||||||
.insert(rows)
|
|
||||||
.onConflict(['name', 'app_name', 'environment'])
|
for (let i = 0; i < rows.length; i += MAX_INSERT_BATCH_SIZE) {
|
||||||
.merge(['seen_at']);
|
const chunk = rows.slice(i, i + MAX_INSERT_BATCH_SIZE);
|
||||||
|
try {
|
||||||
|
await this.db(TABLE)
|
||||||
|
.insert(chunk)
|
||||||
|
.onConflict(['name', 'app_name', 'environment'])
|
||||||
|
.merge(['seen_at']);
|
||||||
|
} catch (error) {
|
||||||
|
this.logger.debug(
|
||||||
|
`unknown_flags: batch ${i / MAX_INSERT_BATCH_SIZE + 1} failed and was skipped.`,
|
||||||
|
error,
|
||||||
|
);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
Loading…
Reference in New Issue
Block a user