1
0
mirror of https://github.com/Unleash/unleash.git synced 2025-05-31 01:16:01 +02:00

feat: add support for bulk operations on client apps/instance registr… (#744)

* feat: add support for bulk operations on client apps/instance registration
-- stores client registrations for 5 seconds, then runs distinct and inserts them into db

fixes: #732
This commit is contained in:
Christopher Kolstad 2021-03-04 12:54:13 +01:00 committed by GitHub
parent aaab3fbb57
commit 4808eb32ff
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
14 changed files with 465 additions and 120 deletions

View File

@ -45,7 +45,7 @@
"nyc": {
"all": true,
"include": [
"dist/**/*.js"
"dist/lib/**/*.js"
],
"exclude": [
"dist/bin",
@ -164,7 +164,7 @@
"trailingComma": "all",
"overrides": [
{
"files": "*.{json,yaml,yml,md}",
"files": "*.{json,yaml,yml,md,ts}",
"options": {
"tabWidth": 2
}

View File

@ -41,31 +41,28 @@ class ClientApplicationsDb {
this.eventBus = eventBus;
}
async updateRow(details, prev) {
// eslint-disable-next-line no-param-reassign
details.updatedAt = 'now()';
async upsert(details) {
return this.db(TABLE)
.where('app_name', details.appName)
.update(remapRow(details, prev));
.insert(remapRow(details))
.onConflict('app_name')
.merge();
}
async insertNewRow(details) {
return this.db(TABLE).insert(remapRow(details));
async bulkUpsert(apps) {
const rows = apps.map(remapRow);
return this.db(TABLE)
.insert(rows)
.onConflict('app_name')
.merge();
}
async upsert(data) {
if (!data) {
throw new Error('Missing data to add / update');
}
return this.db(TABLE)
.select(COLUMNS)
.where('app_name', data.appName)
.then(result => {
if (result && result[0]) {
return this.updateRow(data, result[0]);
}
return this.insertNewRow(data);
});
async exists({ appName }) {
const result = await this.db.raw(
`SELECT EXISTS (SELECT 1 FROM ${TABLE} WHERE app_name = ?) AS present`,
[appName],
);
const { present } = result.rows[0];
return present;
}
async getAll() {

View File

@ -26,6 +26,14 @@ const mapRow = row => ({
createdAt: row.created_at,
});
const mapToDb = client => ({
app_name: client.appName,
instance_id: client.instanceId,
sdk_version: client.sdkVersion || '',
client_ip: client.clientIp,
last_seen: client.lastSeen || 'now()',
});
class ClientInstanceStore {
constructor(db, eventBus, getLogger) {
this.db = db;
@ -51,42 +59,30 @@ class ClientInstanceStore {
}
}
async updateRow(details) {
async bulkUpsert(instances) {
const rows = instances.map(mapToDb);
return this.db(TABLE)
.where('app_name', details.appName)
.where('instance_id', details.instanceId)
.update({
last_seen: 'now()',
client_ip: details.clientIp,
sdk_version: details.sdkVersion,
});
.insert(rows)
.onConflict(['app_name', 'instance_id'])
.merge();
}
async insertNewRow(details) {
return this.db(TABLE).insert({
app_name: details.appName,
instance_id: details.instanceId,
sdk_version: details.sdkVersion,
client_ip: details.clientIp,
});
async exists({ appName, instanceId }) {
const result = await this.db.raw(
`SELECT EXISTS (SELECT 1 FROM ${TABLE} WHERE app_name = ? AND instance_id = ?) AS present`,
[appName, instanceId],
);
const { present } = result.rows[0];
return present;
}
async insert(details) {
const stopTimer = this.metricTimer('insert');
const result = await this.db(TABLE)
.count('*')
.where('app_name', details.appName)
.where('instance_id', details.instanceId)
.first();
let item;
if (Number(result.count) > 0) {
item = await this.updateRow(details);
} else {
item = await this.insertNewRow(details);
}
const item = await this.db(TABLE)
.insert(mapToDb(details))
.onConflict(['app_name', 'instance_id'])
.merge();
stopTimer();

View File

@ -33,6 +33,24 @@ class EventStore extends EventEmitter {
}
}
async batchStore(events) {
try {
await this.db('events').insert(events.map(this.eventToDbRow));
process.nextTick(() => events.forEach(e => this.emit(e.type, e)));
} catch (e) {
this.logger.warn('Failed to store events');
}
}
eventToDbRow(e) {
return {
type: e.type,
created_by: e.createdBy,
data: e.data,
tags: e.tags ? JSON.stringify(e.tags) : [],
};
}
async getEvents() {
try {
const rows = await this.db

View File

@ -0,0 +1,16 @@
const test = require('ava');
const EventStore = require('./event-store');
const getLogger = require('../../test/fixtures/no-logger');
test('Trying to get events if db fails should yield empty list', async t => {
const store = new EventStore({}, getLogger);
const events = await store.getEvents();
t.is(events.length, 0);
});
test('Trying to get events by name if db fails should yield empty list', async t => {
const store = new EventStore({}, getLogger);
const events = await store.getEventsFilterByName('application-created');
t.truthy(events);
t.is(events.length, 0);
});

View File

@ -617,6 +617,7 @@ test('Tags should be included in updated events', async t => {
test('Trying to get features while database is down should yield 500', t => {
t.plan(0);
getLogger.setMuteError(true);
const { request, base } = getSetup(false);
return request.get(`${base}/api/admin/features`).expect(500);
});

View File

@ -121,6 +121,7 @@ test('should be able to filter by type', t => {
test('Getting tags while database is down should be a 500', t => {
t.plan(0);
getLogger.setMuteError(true);
const { request, base } = getSetup(false);
return request.get(`${base}/api/admin/tags`).expect(500);
});

View File

@ -47,7 +47,7 @@ test('should register client', t => {
.expect(202);
});
test('should register client without sdkVersin', t => {
test('should register client without sdkVersion', t => {
t.plan(0);
const { request } = getSetup();
return request
@ -97,7 +97,6 @@ test('should fail if store fails', t => {
throw new Error('opps');
},
};
const app = getApp({
baseUriPath: '',
stores,

View File

@ -2,7 +2,7 @@
const test = require('ava');
const moment = require('moment');
const lolex = require('lolex');
const sinon = require('sinon');
const { EventEmitter } = require('events');
const UnleashClientMetrics = require('./index');
@ -26,7 +26,7 @@ test('should work without state', t => {
});
test.cb('data should expire', t => {
const clock = lolex.install();
const clock = sinon.useFakeTimers();
const clientMetricsStore = new EventEmitter();
const metrics = new UnleashClientMetrics(
@ -67,7 +67,7 @@ test.cb('data should expire', t => {
t.true(lastMinExpires === 1);
t.true(lastHourExpires === 1);
clock.uninstall();
clock.restore();
t.end();
});
@ -132,7 +132,7 @@ test('should listen to metrics from store', t => {
metrics.destroy();
});
test('should build up list of seend toggles when new metrics arrives', t => {
test('should build up list of seen toggles when new metrics arrives', t => {
const clientMetricsStore = new EventEmitter();
const metrics = new UnleashClientMetrics(
{ clientMetricsStore },
@ -200,7 +200,7 @@ test('should handle a lot of toggles', t => {
});
test('should have correct values for lastMinute', t => {
const clock = lolex.install();
const clock = sinon.useFakeTimers();
const clientMetricsStore = new EventEmitter();
const metrics = new UnleashClientMetrics(
@ -271,11 +271,11 @@ test('should have correct values for lastMinute', t => {
t.deepEqual(c.lastMinute.toggle, { yes: 0, no: 0 });
metrics.destroy();
clock.uninstall();
clock.restore();
});
test('should have correct values for lastHour', t => {
const clock = lolex.install();
const clock = sinon.useFakeTimers();
const clientMetricsStore = new EventEmitter();
const metrics = new UnleashClientMetrics(
@ -356,7 +356,7 @@ test('should have correct values for lastHour', t => {
t.deepEqual(c.lastHour.toggle, { yes: 0, no: 0 });
metrics.destroy();
clock.uninstall();
clock.restore();
});
test('should not fail when toggle metrics is missing yes/no field', t => {
@ -403,3 +403,145 @@ test('should not fail when toggle metrics is missing yes/no field', t => {
metrics.destroy();
});
test('Multiple registrations of same appname and instanceid within same time period should only cause one registration', async t => {
const clock = sinon.useFakeTimers(); // sinon has superseded lolex
const clientMetricsStore = new EventEmitter();
const appStoreSpy = sinon.spy();
const bulkSpy = sinon.spy();
const clientApplicationsStore = {
bulkUpsert: appStoreSpy,
};
const clientInstanceStore = {
bulkUpsert: bulkSpy,
};
const clientMetrics = new UnleashClientMetrics(
{ clientMetricsStore, clientApplicationsStore, clientInstanceStore },
{ getLogger },
);
const client1 = {
appName: 'test_app',
instanceId: 'ava',
strategies: [{ name: 'defaullt' }],
started: new Date(),
interval: 10,
};
await clientMetrics.registerClient(client1, '127.0.0.1');
await clientMetrics.registerClient(client1, '127.0.0.1');
await clientMetrics.registerClient(client1, '127.0.0.1');
await clientMetrics.registerClient(client1, '127.0.0.1');
await clock.tickAsync(7 * 1000);
t.is(appStoreSpy.callCount, 1);
t.is(bulkSpy.callCount, 1);
const registrations = appStoreSpy.firstCall.args[0];
t.is(registrations.length, 1);
t.is(registrations[0].appName, client1.appName);
t.is(registrations[0].instanceId, client1.instanceId);
t.is(registrations[0].started, client1.started);
t.is(registrations[0].interval, client1.interval);
clock.restore();
});
test('Multiple unique clients causes multiple registrations', async t => {
const clock = sinon.useFakeTimers();
const clientMetricsStore = new EventEmitter();
const appStoreSpy = sinon.spy();
const bulkSpy = sinon.spy();
const clientApplicationsStore = {
bulkUpsert: appStoreSpy,
};
const clientInstanceStore = {
bulkUpsert: bulkSpy,
};
const clientMetrics = new UnleashClientMetrics(
{ clientMetricsStore, clientApplicationsStore, clientInstanceStore },
{ getLogger },
);
const client1 = {
appName: 'test_app',
instanceId: 'client1',
strategies: [{ name: 'defaullt' }],
started: new Date(),
interval: 10,
};
const client2 = {
appName: 'test_app_2',
instanceId: 'client2',
strategies: [{ name: 'defaullt' }],
started: new Date(),
interval: 10,
};
await clientMetrics.registerClient(client1, '127.0.0.1');
await clientMetrics.registerClient(client1, '127.0.0.1');
await clientMetrics.registerClient(client1, '127.0.0.1');
await clientMetrics.registerClient(client2, '127.0.0.1');
await clientMetrics.registerClient(client2, '127.0.0.1');
await clientMetrics.registerClient(client2, '127.0.0.1');
await clock.tickAsync(7 * 1000);
t.is(appStoreSpy.callCount, 1);
t.is(bulkSpy.callCount, 1);
const registrations = appStoreSpy.firstCall.args[0];
t.is(registrations.length, 2);
clock.restore();
});
test('Same client registered outside of dedup interval will be registered twice', async t => {
const clock = sinon.useFakeTimers(); // sinon has superseded lolex
const clientMetricsStore = new EventEmitter();
const appStoreSpy = sinon.spy();
const bulkSpy = sinon.spy();
const clientApplicationsStore = {
bulkUpsert: appStoreSpy,
};
const clientInstanceStore = {
bulkUpsert: bulkSpy,
};
const bulkInterval = 2000;
const clientMetrics = new UnleashClientMetrics(
{ clientMetricsStore, clientApplicationsStore, clientInstanceStore },
{ getLogger, bulkInterval },
);
const client1 = {
appName: 'test_app',
instanceId: 'client1',
strategies: [{ name: 'defaullt' }],
started: new Date(),
interval: 10,
};
await clientMetrics.registerClient(client1, '127.0.0.1');
await clientMetrics.registerClient(client1, '127.0.0.1');
await clientMetrics.registerClient(client1, '127.0.0.1');
await clock.tickAsync(3 * 1000);
await clientMetrics.registerClient(client1, '127.0.0.1');
await clientMetrics.registerClient(client1, '127.0.0.1');
await clientMetrics.registerClient(client1, '127.0.0.1');
await clock.tickAsync(3 * 1000);
t.is(appStoreSpy.callCount, 2);
t.is(bulkSpy.callCount, 2);
const firstRegistrations = appStoreSpy.firstCall.args[0];
const secondRegistrations = appStoreSpy.secondCall.args[0];
t.is(firstRegistrations[0].appName, secondRegistrations[0].appName);
t.is(firstRegistrations[0].instanceId, secondRegistrations[0].instanceId);
clock.restore();
});
test('No registrations during a time period will not call stores', async t => {
const clock = sinon.useFakeTimers(); // sinon has superseded lolex
const clientMetricsStore = new EventEmitter();
const appStoreSpy = sinon.spy();
const bulkSpy = sinon.spy();
const clientApplicationsStore = {
bulkUpsert: appStoreSpy,
};
const clientInstanceStore = {
bulkUpsert: bulkSpy,
};
// eslint-disable-next-line @typescript-eslint/no-unused-vars
const metrics = new UnleashClientMetrics(
{ clientMetricsStore, clientApplicationsStore, clientInstanceStore },
{ getLogger },
);
await clock.tickAsync(6 * 1000);
t.is(appStoreSpy.callCount, 0);
t.is(bulkSpy.callCount, 0);
clock.restore();
});

View File

@ -5,11 +5,12 @@
const Projection = require('./projection.js');
const TTLList = require('./ttl-list.js');
const appSchema = require('./metrics-schema');
const NotFoundError = require('../../error/notfound-error');
const { clientMetricsSchema } = require('./client-metrics-schema');
const { clientRegisterSchema } = require('./register-schema');
const { APPLICATION_CREATED } = require('../../event-type');
const FIVE_SECONDS = 5 * 1000;
module.exports = class ClientMetricsService {
constructor(
{
@ -20,7 +21,7 @@ module.exports = class ClientMetricsService {
clientInstanceStore,
eventStore,
},
{ getLogger },
{ getLogger, bulkInterval = FIVE_SECONDS },
) {
this.globalCount = 0;
this.apps = {};
@ -60,6 +61,8 @@ module.exports = class ClientMetricsService {
);
});
});
this.seenClients = {};
setInterval(() => this.bulkAdd(), bulkInterval);
clientMetricsStore.on('metrics', m => this.addPayload(m));
}
@ -75,30 +78,53 @@ module.exports = class ClientMetricsService {
});
}
async upsertApp(value, clientIp) {
try {
const app = await this.clientAppStore.getApplication(value.appName);
await this.updateRow(value, app);
} catch (error) {
if (error instanceof NotFoundError) {
await this.clientAppStore.insertNewRow(value);
await this.eventStore.store({
type: APPLICATION_CREATED,
createdBy: clientIp,
data: value,
});
async registerClient(data, clientIp) {
const value = await clientRegisterSchema.validateAsync(data);
value.clientIp = clientIp;
this.logger.info(`${JSON.stringify(data)}`);
this.seenClients[this.clientKey(value)] = value;
}
clientKey(client) {
return `${client.appName}_${client.instanceId}`;
}
async bulkAdd() {
if (
this &&
this.seenClients &&
this.clientAppStore &&
this.clientInstanceStore
) {
const uniqueRegistrations = Object.values(this.seenClients);
const uniqueApps = Object.values(
uniqueRegistrations.reduce((soFar, reg) => {
soFar[reg.appName] = reg;
return soFar;
}, {}),
);
this.seenClients = {};
try {
if (uniqueRegistrations.length > 0) {
await this.clientAppStore.bulkUpsert(uniqueApps);
await this.clientInstanceStore.bulkUpsert(
uniqueRegistrations,
);
} else {
this.logger.debug('No registrations in last time period');
}
} catch (err) {
this.logger.warn('Failed to register clients', err);
}
}
}
async registerClient(data, clientIp) {
const value = await clientRegisterSchema.validateAsync(data);
value.clientIp = clientIp;
await this.upsertApp(value, clientIp);
await this.clientInstanceStore.insert(value);
this.logger.info(
`New client registration: appName=${value.appName}, instanceId=${value.instanceId}`,
);
appToEvent(app) {
return {
type: APPLICATION_CREATED,
createdBy: app.clientIp,
data: app,
};
}
getAppsWithToggles() {

View File

@ -0,0 +1,20 @@
'use strict';
exports.up = function(db, cb) {
db.runSql(
`
ALTER TABLE client_instances ADD PRIMARY KEY (app_name, instance_id);
`,
cb,
);
};
exports.down = function(db, cb) {
db.runSql(
`
ALTER TABLE client_instances DROP CONSTRAINT client_instances_pkey;
DROP INDEX client_instance_pkey
`,
cb,
);
};

View File

@ -38,44 +38,6 @@ test.serial('should require valid send metrics', async t => {
.expect(400);
});
test.serial('should register client', async t => {
t.plan(0);
const request = await setupApp(stores);
return request
.post('/api/client/register')
.send({
appName: 'demo',
instanceId: 'test',
strategies: ['default'],
started: Date.now(),
interval: 10,
})
.expect(202);
});
test.serial('should allow client to register multiple times', async t => {
t.plan(0);
const request = await setupApp(stores);
const clientRegistration = {
appName: 'multipleRegistration',
instanceId: 'test',
strategies: ['default', 'another'],
started: Date.now(),
interval: 10,
};
return request
.post('/api/client/register')
.send(clientRegistration)
.expect(202)
.then(() =>
request
.post('/api/client/register')
.send(clientRegistration)
.expect(202),
);
});
test.serial('should accept client metrics', async t => {
t.plan(0);
const request = await setupApp(stores);

View File

@ -0,0 +1,111 @@
'use strict';
const test = require('ava');
const sinon = require('sinon');
const faker = require('faker');
const { setupApp } = require('../../helpers/test-helper');
const dbInit = require('../../helpers/database-init');
const getLogger = require('../../../fixtures/no-logger');
const version = require('../../../../lib/util/version');
const asyncFilter = async (arr, predicate) => {
const results = await Promise.all(arr.map(predicate));
return arr.filter((_v, index) => results[index]);
};
let stores;
test.before(async () => {
const db = await dbInit('register_client', getLogger);
stores = db.stores;
});
test.after(async () => {
await stores.db.destroy();
});
test.serial('should register client', async t => {
t.plan(0);
const request = await setupApp(stores);
return request
.post('/api/client/register')
.send({
appName: 'demo',
instanceId: 'test',
strategies: ['default'],
started: Date.now(),
interval: 10,
})
.expect(202);
});
test.serial('should allow client to register multiple times', async t => {
t.plan(2);
const clock = sinon.useFakeTimers();
const { clientInstanceStore, clientApplicationsStore } = stores;
const request = await setupApp(stores);
const clientRegistration = {
appName: 'multipleRegistration',
instanceId: 'test',
strategies: ['default', 'another'],
started: Date.now(),
interval: 10,
};
await request
.post('/api/client/register')
.send(clientRegistration)
.expect(202)
.then(() =>
request
.post('/api/client/register')
.send(clientRegistration)
.expect(202),
);
await clock.tickAsync(6 * 1000);
t.assert(clientApplicationsStore.exists(clientRegistration));
t.assert(clientInstanceStore.exists(clientRegistration));
clock.restore();
});
test.serial.skip('Should handle a massive bulk registration', async t => {
const { clientInstanceStore, clientApplicationsStore } = stores;
const request = await setupApp(stores);
const clients = [];
while (clients.length < 2000) {
const clientRegistration = {
appName: faker.internet.domainName(),
instanceId: faker.random.uuid(),
strategies: ['default'],
started: Date.now(),
interval: faker.random.number(),
sdkVersion: version,
icon: '',
description: faker.company.catchPhrase(),
color: faker.internet.color(),
};
clients.push(clientRegistration);
// eslint-disable-next-line no-await-in-loop
await request
.post('/api/client/register')
.send(clientRegistration)
.expect(202);
}
t.is(clients.length, 2000);
await new Promise(res => setTimeout(res, 5500));
// Verify clientInstance
const notSavedInstance = await asyncFilter(clients, async c => {
const exists = await clientInstanceStore.exists(c);
return !exists;
});
t.is(notSavedInstance.length, 0);
// Verify application
const notSavedApp = await asyncFilter(clients, async c => {
const exists = await clientApplicationsStore.exists(c);
return !exists;
});
t.is(notSavedApp.length, 0);
});

View File

@ -0,0 +1,56 @@
'use strict';
const test = require('ava');
const sinon = require('sinon');
const { APPLICATION_CREATED } = require('../../../lib/event-type');
const dbInit = require('../helpers/database-init');
const getLogger = require('../../fixtures/no-logger');
let stores;
let eventStore;
test.before(async () => {
const db = await dbInit('event_store_serial', getLogger);
stores = db.stores;
eventStore = stores.eventStore;
});
test.after(async () => {
await stores.db.destroy();
});
test.serial('Should be able to store multiple events at once', async t => {
const clock = sinon.useFakeTimers();
const event1 = {
type: APPLICATION_CREATED,
createdBy: '127.0.0.1',
data: {
clientIp: '127.0.0.1',
appName: 'test1',
},
};
const event2 = {
type: APPLICATION_CREATED,
createdBy: '127.0.0.1',
data: {
clientIp: '127.0.0.1',
appName: 'test2',
},
};
const event3 = {
type: APPLICATION_CREATED,
createdBy: '127.0.0.1',
data: {
clientIp: '127.0.0.1',
appName: 'test3',
},
tags: [{ type: 'simple', value: 'mytest' }],
};
let seen = 0;
eventStore.on(APPLICATION_CREATED, () => seen++);
await eventStore.batchStore([event1, event2, event3]);
await clock.tickAsync(100);
t.is(seen, 3);
clock.restore();
});