diff --git a/package.json b/package.json index 363bd31f0d..9eee86f2e2 100644 --- a/package.json +++ b/package.json @@ -45,7 +45,7 @@ "nyc": { "all": true, "include": [ - "dist/**/*.js" + "dist/lib/**/*.js" ], "exclude": [ "dist/bin", @@ -164,7 +164,7 @@ "trailingComma": "all", "overrides": [ { - "files": "*.{json,yaml,yml,md}", + "files": "*.{json,yaml,yml,md,ts}", "options": { "tabWidth": 2 } diff --git a/src/lib/db/client-applications-store.js b/src/lib/db/client-applications-store.js index f0a9c760c5..be40d75d47 100644 --- a/src/lib/db/client-applications-store.js +++ b/src/lib/db/client-applications-store.js @@ -41,31 +41,28 @@ class ClientApplicationsDb { this.eventBus = eventBus; } - async updateRow(details, prev) { - // eslint-disable-next-line no-param-reassign - details.updatedAt = 'now()'; + async upsert(details) { return this.db(TABLE) - .where('app_name', details.appName) - .update(remapRow(details, prev)); + .insert(remapRow(details)) + .onConflict('app_name') + .merge(); } - async insertNewRow(details) { - return this.db(TABLE).insert(remapRow(details)); + async bulkUpsert(apps) { + const rows = apps.map(remapRow); + return this.db(TABLE) + .insert(rows) + .onConflict('app_name') + .merge(); } - async upsert(data) { - if (!data) { - throw new Error('Missing data to add / update'); - } - return this.db(TABLE) - .select(COLUMNS) - .where('app_name', data.appName) - .then(result => { - if (result && result[0]) { - return this.updateRow(data, result[0]); - } - return this.insertNewRow(data); - }); + async exists({ appName }) { + const result = await this.db.raw( + `SELECT EXISTS (SELECT 1 FROM ${TABLE} WHERE app_name = ?) AS present`, + [appName], + ); + const { present } = result.rows[0]; + return present; } async getAll() { diff --git a/src/lib/db/client-instance-store.js b/src/lib/db/client-instance-store.js index 5512134e8b..de52b6d6f7 100644 --- a/src/lib/db/client-instance-store.js +++ b/src/lib/db/client-instance-store.js @@ -26,6 +26,14 @@ const mapRow = row => ({ createdAt: row.created_at, }); +const mapToDb = client => ({ + app_name: client.appName, + instance_id: client.instanceId, + sdk_version: client.sdkVersion || '', + client_ip: client.clientIp, + last_seen: client.lastSeen || 'now()', +}); + class ClientInstanceStore { constructor(db, eventBus, getLogger) { this.db = db; @@ -51,42 +59,30 @@ class ClientInstanceStore { } } - async updateRow(details) { + async bulkUpsert(instances) { + const rows = instances.map(mapToDb); return this.db(TABLE) - .where('app_name', details.appName) - .where('instance_id', details.instanceId) - .update({ - last_seen: 'now()', - client_ip: details.clientIp, - sdk_version: details.sdkVersion, - }); + .insert(rows) + .onConflict(['app_name', 'instance_id']) + .merge(); } - async insertNewRow(details) { - return this.db(TABLE).insert({ - app_name: details.appName, - instance_id: details.instanceId, - sdk_version: details.sdkVersion, - client_ip: details.clientIp, - }); + async exists({ appName, instanceId }) { + const result = await this.db.raw( + `SELECT EXISTS (SELECT 1 FROM ${TABLE} WHERE app_name = ? AND instance_id = ?) AS present`, + [appName, instanceId], + ); + const { present } = result.rows[0]; + return present; } async insert(details) { const stopTimer = this.metricTimer('insert'); - const result = await this.db(TABLE) - .count('*') - .where('app_name', details.appName) - .where('instance_id', details.instanceId) - .first(); - - let item; - - if (Number(result.count) > 0) { - item = await this.updateRow(details); - } else { - item = await this.insertNewRow(details); - } + const item = await this.db(TABLE) + .insert(mapToDb(details)) + .onConflict(['app_name', 'instance_id']) + .merge(); stopTimer(); diff --git a/src/lib/db/event-store.js b/src/lib/db/event-store.js index dba4517d89..bf92156b5d 100644 --- a/src/lib/db/event-store.js +++ b/src/lib/db/event-store.js @@ -33,6 +33,24 @@ class EventStore extends EventEmitter { } } + async batchStore(events) { + try { + await this.db('events').insert(events.map(this.eventToDbRow)); + process.nextTick(() => events.forEach(e => this.emit(e.type, e))); + } catch (e) { + this.logger.warn('Failed to store events'); + } + } + + eventToDbRow(e) { + return { + type: e.type, + created_by: e.createdBy, + data: e.data, + tags: e.tags ? JSON.stringify(e.tags) : [], + }; + } + async getEvents() { try { const rows = await this.db diff --git a/src/lib/db/event-store.test.js b/src/lib/db/event-store.test.js new file mode 100644 index 0000000000..0bfe99b574 --- /dev/null +++ b/src/lib/db/event-store.test.js @@ -0,0 +1,16 @@ +const test = require('ava'); +const EventStore = require('./event-store'); +const getLogger = require('../../test/fixtures/no-logger'); + +test('Trying to get events if db fails should yield empty list', async t => { + const store = new EventStore({}, getLogger); + const events = await store.getEvents(); + t.is(events.length, 0); +}); + +test('Trying to get events by name if db fails should yield empty list', async t => { + const store = new EventStore({}, getLogger); + const events = await store.getEventsFilterByName('application-created'); + t.truthy(events); + t.is(events.length, 0); +}); diff --git a/src/lib/routes/admin-api/feature.test.js b/src/lib/routes/admin-api/feature.test.js index 84eb57146d..9879fe9be9 100644 --- a/src/lib/routes/admin-api/feature.test.js +++ b/src/lib/routes/admin-api/feature.test.js @@ -617,6 +617,7 @@ test('Tags should be included in updated events', async t => { test('Trying to get features while database is down should yield 500', t => { t.plan(0); + getLogger.setMuteError(true); const { request, base } = getSetup(false); return request.get(`${base}/api/admin/features`).expect(500); }); diff --git a/src/lib/routes/admin-api/tag.test.js b/src/lib/routes/admin-api/tag.test.js index b981b4445b..f7963e92aa 100644 --- a/src/lib/routes/admin-api/tag.test.js +++ b/src/lib/routes/admin-api/tag.test.js @@ -121,6 +121,7 @@ test('should be able to filter by type', t => { test('Getting tags while database is down should be a 500', t => { t.plan(0); + getLogger.setMuteError(true); const { request, base } = getSetup(false); return request.get(`${base}/api/admin/tags`).expect(500); }); diff --git a/src/lib/routes/client-api/register.test.js b/src/lib/routes/client-api/register.test.js index 785584dd27..15bf80bfdd 100644 --- a/src/lib/routes/client-api/register.test.js +++ b/src/lib/routes/client-api/register.test.js @@ -47,7 +47,7 @@ test('should register client', t => { .expect(202); }); -test('should register client without sdkVersin', t => { +test('should register client without sdkVersion', t => { t.plan(0); const { request } = getSetup(); return request @@ -97,7 +97,6 @@ test('should fail if store fails', t => { throw new Error('opps'); }, }; - const app = getApp({ baseUriPath: '', stores, diff --git a/src/lib/services/client-metrics/client-metrics.test.js b/src/lib/services/client-metrics/client-metrics.test.js index 99051d2c25..2faa21bb96 100644 --- a/src/lib/services/client-metrics/client-metrics.test.js +++ b/src/lib/services/client-metrics/client-metrics.test.js @@ -2,7 +2,7 @@ const test = require('ava'); const moment = require('moment'); -const lolex = require('lolex'); +const sinon = require('sinon'); const { EventEmitter } = require('events'); const UnleashClientMetrics = require('./index'); @@ -26,7 +26,7 @@ test('should work without state', t => { }); test.cb('data should expire', t => { - const clock = lolex.install(); + const clock = sinon.useFakeTimers(); const clientMetricsStore = new EventEmitter(); const metrics = new UnleashClientMetrics( @@ -67,7 +67,7 @@ test.cb('data should expire', t => { t.true(lastMinExpires === 1); t.true(lastHourExpires === 1); - clock.uninstall(); + clock.restore(); t.end(); }); @@ -132,7 +132,7 @@ test('should listen to metrics from store', t => { metrics.destroy(); }); -test('should build up list of seend toggles when new metrics arrives', t => { +test('should build up list of seen toggles when new metrics arrives', t => { const clientMetricsStore = new EventEmitter(); const metrics = new UnleashClientMetrics( { clientMetricsStore }, @@ -200,7 +200,7 @@ test('should handle a lot of toggles', t => { }); test('should have correct values for lastMinute', t => { - const clock = lolex.install(); + const clock = sinon.useFakeTimers(); const clientMetricsStore = new EventEmitter(); const metrics = new UnleashClientMetrics( @@ -271,11 +271,11 @@ test('should have correct values for lastMinute', t => { t.deepEqual(c.lastMinute.toggle, { yes: 0, no: 0 }); metrics.destroy(); - clock.uninstall(); + clock.restore(); }); test('should have correct values for lastHour', t => { - const clock = lolex.install(); + const clock = sinon.useFakeTimers(); const clientMetricsStore = new EventEmitter(); const metrics = new UnleashClientMetrics( @@ -356,7 +356,7 @@ test('should have correct values for lastHour', t => { t.deepEqual(c.lastHour.toggle, { yes: 0, no: 0 }); metrics.destroy(); - clock.uninstall(); + clock.restore(); }); test('should not fail when toggle metrics is missing yes/no field', t => { @@ -403,3 +403,145 @@ test('should not fail when toggle metrics is missing yes/no field', t => { metrics.destroy(); }); + +test('Multiple registrations of same appname and instanceid within same time period should only cause one registration', async t => { + const clock = sinon.useFakeTimers(); // sinon has superseded lolex + const clientMetricsStore = new EventEmitter(); + const appStoreSpy = sinon.spy(); + const bulkSpy = sinon.spy(); + const clientApplicationsStore = { + bulkUpsert: appStoreSpy, + }; + const clientInstanceStore = { + bulkUpsert: bulkSpy, + }; + const clientMetrics = new UnleashClientMetrics( + { clientMetricsStore, clientApplicationsStore, clientInstanceStore }, + { getLogger }, + ); + const client1 = { + appName: 'test_app', + instanceId: 'ava', + strategies: [{ name: 'defaullt' }], + started: new Date(), + interval: 10, + }; + await clientMetrics.registerClient(client1, '127.0.0.1'); + await clientMetrics.registerClient(client1, '127.0.0.1'); + await clientMetrics.registerClient(client1, '127.0.0.1'); + await clientMetrics.registerClient(client1, '127.0.0.1'); + await clock.tickAsync(7 * 1000); + t.is(appStoreSpy.callCount, 1); + t.is(bulkSpy.callCount, 1); + const registrations = appStoreSpy.firstCall.args[0]; + t.is(registrations.length, 1); + t.is(registrations[0].appName, client1.appName); + t.is(registrations[0].instanceId, client1.instanceId); + t.is(registrations[0].started, client1.started); + t.is(registrations[0].interval, client1.interval); + clock.restore(); +}); + +test('Multiple unique clients causes multiple registrations', async t => { + const clock = sinon.useFakeTimers(); + const clientMetricsStore = new EventEmitter(); + const appStoreSpy = sinon.spy(); + const bulkSpy = sinon.spy(); + const clientApplicationsStore = { + bulkUpsert: appStoreSpy, + }; + const clientInstanceStore = { + bulkUpsert: bulkSpy, + }; + const clientMetrics = new UnleashClientMetrics( + { clientMetricsStore, clientApplicationsStore, clientInstanceStore }, + { getLogger }, + ); + const client1 = { + appName: 'test_app', + instanceId: 'client1', + strategies: [{ name: 'defaullt' }], + started: new Date(), + interval: 10, + }; + const client2 = { + appName: 'test_app_2', + instanceId: 'client2', + strategies: [{ name: 'defaullt' }], + started: new Date(), + interval: 10, + }; + await clientMetrics.registerClient(client1, '127.0.0.1'); + await clientMetrics.registerClient(client1, '127.0.0.1'); + await clientMetrics.registerClient(client1, '127.0.0.1'); + await clientMetrics.registerClient(client2, '127.0.0.1'); + await clientMetrics.registerClient(client2, '127.0.0.1'); + await clientMetrics.registerClient(client2, '127.0.0.1'); + await clock.tickAsync(7 * 1000); + t.is(appStoreSpy.callCount, 1); + t.is(bulkSpy.callCount, 1); + const registrations = appStoreSpy.firstCall.args[0]; + t.is(registrations.length, 2); + clock.restore(); +}); +test('Same client registered outside of dedup interval will be registered twice', async t => { + const clock = sinon.useFakeTimers(); // sinon has superseded lolex + const clientMetricsStore = new EventEmitter(); + const appStoreSpy = sinon.spy(); + const bulkSpy = sinon.spy(); + const clientApplicationsStore = { + bulkUpsert: appStoreSpy, + }; + const clientInstanceStore = { + bulkUpsert: bulkSpy, + }; + const bulkInterval = 2000; + const clientMetrics = new UnleashClientMetrics( + { clientMetricsStore, clientApplicationsStore, clientInstanceStore }, + { getLogger, bulkInterval }, + ); + const client1 = { + appName: 'test_app', + instanceId: 'client1', + strategies: [{ name: 'defaullt' }], + started: new Date(), + interval: 10, + }; + await clientMetrics.registerClient(client1, '127.0.0.1'); + await clientMetrics.registerClient(client1, '127.0.0.1'); + await clientMetrics.registerClient(client1, '127.0.0.1'); + await clock.tickAsync(3 * 1000); + await clientMetrics.registerClient(client1, '127.0.0.1'); + await clientMetrics.registerClient(client1, '127.0.0.1'); + await clientMetrics.registerClient(client1, '127.0.0.1'); + await clock.tickAsync(3 * 1000); + t.is(appStoreSpy.callCount, 2); + t.is(bulkSpy.callCount, 2); + const firstRegistrations = appStoreSpy.firstCall.args[0]; + const secondRegistrations = appStoreSpy.secondCall.args[0]; + t.is(firstRegistrations[0].appName, secondRegistrations[0].appName); + t.is(firstRegistrations[0].instanceId, secondRegistrations[0].instanceId); + clock.restore(); +}); + +test('No registrations during a time period will not call stores', async t => { + const clock = sinon.useFakeTimers(); // sinon has superseded lolex + const clientMetricsStore = new EventEmitter(); + const appStoreSpy = sinon.spy(); + const bulkSpy = sinon.spy(); + const clientApplicationsStore = { + bulkUpsert: appStoreSpy, + }; + const clientInstanceStore = { + bulkUpsert: bulkSpy, + }; + // eslint-disable-next-line @typescript-eslint/no-unused-vars + const metrics = new UnleashClientMetrics( + { clientMetricsStore, clientApplicationsStore, clientInstanceStore }, + { getLogger }, + ); + await clock.tickAsync(6 * 1000); + t.is(appStoreSpy.callCount, 0); + t.is(bulkSpy.callCount, 0); + clock.restore(); +}); diff --git a/src/lib/services/client-metrics/index.js b/src/lib/services/client-metrics/index.js index 6ad66b03a2..ee83a285fd 100644 --- a/src/lib/services/client-metrics/index.js +++ b/src/lib/services/client-metrics/index.js @@ -5,11 +5,12 @@ const Projection = require('./projection.js'); const TTLList = require('./ttl-list.js'); const appSchema = require('./metrics-schema'); -const NotFoundError = require('../../error/notfound-error'); const { clientMetricsSchema } = require('./client-metrics-schema'); const { clientRegisterSchema } = require('./register-schema'); const { APPLICATION_CREATED } = require('../../event-type'); +const FIVE_SECONDS = 5 * 1000; + module.exports = class ClientMetricsService { constructor( { @@ -20,7 +21,7 @@ module.exports = class ClientMetricsService { clientInstanceStore, eventStore, }, - { getLogger }, + { getLogger, bulkInterval = FIVE_SECONDS }, ) { this.globalCount = 0; this.apps = {}; @@ -60,6 +61,8 @@ module.exports = class ClientMetricsService { ); }); }); + this.seenClients = {}; + setInterval(() => this.bulkAdd(), bulkInterval); clientMetricsStore.on('metrics', m => this.addPayload(m)); } @@ -75,30 +78,53 @@ module.exports = class ClientMetricsService { }); } - async upsertApp(value, clientIp) { - try { - const app = await this.clientAppStore.getApplication(value.appName); - await this.updateRow(value, app); - } catch (error) { - if (error instanceof NotFoundError) { - await this.clientAppStore.insertNewRow(value); - await this.eventStore.store({ - type: APPLICATION_CREATED, - createdBy: clientIp, - data: value, - }); + async registerClient(data, clientIp) { + const value = await clientRegisterSchema.validateAsync(data); + value.clientIp = clientIp; + this.logger.info(`${JSON.stringify(data)}`); + this.seenClients[this.clientKey(value)] = value; + } + + clientKey(client) { + return `${client.appName}_${client.instanceId}`; + } + + async bulkAdd() { + if ( + this && + this.seenClients && + this.clientAppStore && + this.clientInstanceStore + ) { + const uniqueRegistrations = Object.values(this.seenClients); + const uniqueApps = Object.values( + uniqueRegistrations.reduce((soFar, reg) => { + soFar[reg.appName] = reg; + return soFar; + }, {}), + ); + this.seenClients = {}; + try { + if (uniqueRegistrations.length > 0) { + await this.clientAppStore.bulkUpsert(uniqueApps); + await this.clientInstanceStore.bulkUpsert( + uniqueRegistrations, + ); + } else { + this.logger.debug('No registrations in last time period'); + } + } catch (err) { + this.logger.warn('Failed to register clients', err); } } } - async registerClient(data, clientIp) { - const value = await clientRegisterSchema.validateAsync(data); - value.clientIp = clientIp; - await this.upsertApp(value, clientIp); - await this.clientInstanceStore.insert(value); - this.logger.info( - `New client registration: appName=${value.appName}, instanceId=${value.instanceId}`, - ); + appToEvent(app) { + return { + type: APPLICATION_CREATED, + createdBy: app.clientIp, + data: app, + }; } getAppsWithToggles() { diff --git a/src/migrations/20210302080040-add-pk-to-client-instances.js b/src/migrations/20210302080040-add-pk-to-client-instances.js new file mode 100644 index 0000000000..eecf9f887d --- /dev/null +++ b/src/migrations/20210302080040-add-pk-to-client-instances.js @@ -0,0 +1,20 @@ +'use strict'; + +exports.up = function(db, cb) { + db.runSql( + ` + ALTER TABLE client_instances ADD PRIMARY KEY (app_name, instance_id); + `, + cb, + ); +}; + +exports.down = function(db, cb) { + db.runSql( + ` + ALTER TABLE client_instances DROP CONSTRAINT client_instances_pkey; + DROP INDEX client_instance_pkey + `, + cb, + ); +}; diff --git a/src/test/e2e/api/client/metrics.e2e.test.js b/src/test/e2e/api/client/metrics.e2e.test.js index 0fbf14bc6c..c15924691a 100644 --- a/src/test/e2e/api/client/metrics.e2e.test.js +++ b/src/test/e2e/api/client/metrics.e2e.test.js @@ -38,44 +38,6 @@ test.serial('should require valid send metrics', async t => { .expect(400); }); -test.serial('should register client', async t => { - t.plan(0); - const request = await setupApp(stores); - return request - .post('/api/client/register') - .send({ - appName: 'demo', - instanceId: 'test', - strategies: ['default'], - started: Date.now(), - interval: 10, - }) - .expect(202); -}); - -test.serial('should allow client to register multiple times', async t => { - t.plan(0); - const request = await setupApp(stores); - const clientRegistration = { - appName: 'multipleRegistration', - instanceId: 'test', - strategies: ['default', 'another'], - started: Date.now(), - interval: 10, - }; - - return request - .post('/api/client/register') - .send(clientRegistration) - .expect(202) - .then(() => - request - .post('/api/client/register') - .send(clientRegistration) - .expect(202), - ); -}); - test.serial('should accept client metrics', async t => { t.plan(0); const request = await setupApp(stores); diff --git a/src/test/e2e/api/client/register.e2e.test.js b/src/test/e2e/api/client/register.e2e.test.js new file mode 100644 index 0000000000..dbd4997731 --- /dev/null +++ b/src/test/e2e/api/client/register.e2e.test.js @@ -0,0 +1,111 @@ +'use strict'; + +const test = require('ava'); +const sinon = require('sinon'); +const faker = require('faker'); +const { setupApp } = require('../../helpers/test-helper'); +const dbInit = require('../../helpers/database-init'); +const getLogger = require('../../../fixtures/no-logger'); +const version = require('../../../../lib/util/version'); + +const asyncFilter = async (arr, predicate) => { + const results = await Promise.all(arr.map(predicate)); + + return arr.filter((_v, index) => results[index]); +}; + +let stores; + +test.before(async () => { + const db = await dbInit('register_client', getLogger); + stores = db.stores; +}); + +test.after(async () => { + await stores.db.destroy(); +}); + +test.serial('should register client', async t => { + t.plan(0); + const request = await setupApp(stores); + return request + .post('/api/client/register') + .send({ + appName: 'demo', + instanceId: 'test', + strategies: ['default'], + started: Date.now(), + interval: 10, + }) + .expect(202); +}); + +test.serial('should allow client to register multiple times', async t => { + t.plan(2); + const clock = sinon.useFakeTimers(); + const { clientInstanceStore, clientApplicationsStore } = stores; + const request = await setupApp(stores); + const clientRegistration = { + appName: 'multipleRegistration', + instanceId: 'test', + strategies: ['default', 'another'], + started: Date.now(), + interval: 10, + }; + + await request + .post('/api/client/register') + .send(clientRegistration) + .expect(202) + .then(() => + request + .post('/api/client/register') + .send(clientRegistration) + .expect(202), + ); + await clock.tickAsync(6 * 1000); + t.assert(clientApplicationsStore.exists(clientRegistration)); + t.assert(clientInstanceStore.exists(clientRegistration)); + clock.restore(); +}); + +test.serial.skip('Should handle a massive bulk registration', async t => { + const { clientInstanceStore, clientApplicationsStore } = stores; + const request = await setupApp(stores); + const clients = []; + while (clients.length < 2000) { + const clientRegistration = { + appName: faker.internet.domainName(), + instanceId: faker.random.uuid(), + strategies: ['default'], + started: Date.now(), + interval: faker.random.number(), + sdkVersion: version, + icon: '', + description: faker.company.catchPhrase(), + color: faker.internet.color(), + }; + clients.push(clientRegistration); + // eslint-disable-next-line no-await-in-loop + await request + .post('/api/client/register') + .send(clientRegistration) + .expect(202); + } + t.is(clients.length, 2000); + await new Promise(res => setTimeout(res, 5500)); + + // Verify clientInstance + const notSavedInstance = await asyncFilter(clients, async c => { + const exists = await clientInstanceStore.exists(c); + return !exists; + }); + t.is(notSavedInstance.length, 0); + + // Verify application + const notSavedApp = await asyncFilter(clients, async c => { + const exists = await clientApplicationsStore.exists(c); + return !exists; + }); + t.is(notSavedApp.length, 0); +}); diff --git a/src/test/e2e/stores/event-store.e2e.test.js b/src/test/e2e/stores/event-store.e2e.test.js new file mode 100644 index 0000000000..882894e6ce --- /dev/null +++ b/src/test/e2e/stores/event-store.e2e.test.js @@ -0,0 +1,56 @@ +'use strict'; + +const test = require('ava'); +const sinon = require('sinon'); +const { APPLICATION_CREATED } = require('../../../lib/event-type'); + +const dbInit = require('../helpers/database-init'); +const getLogger = require('../../fixtures/no-logger'); + +let stores; +let eventStore; + +test.before(async () => { + const db = await dbInit('event_store_serial', getLogger); + stores = db.stores; + eventStore = stores.eventStore; +}); + +test.after(async () => { + await stores.db.destroy(); +}); + +test.serial('Should be able to store multiple events at once', async t => { + const clock = sinon.useFakeTimers(); + const event1 = { + type: APPLICATION_CREATED, + createdBy: '127.0.0.1', + data: { + clientIp: '127.0.0.1', + appName: 'test1', + }, + }; + const event2 = { + type: APPLICATION_CREATED, + createdBy: '127.0.0.1', + data: { + clientIp: '127.0.0.1', + appName: 'test2', + }, + }; + const event3 = { + type: APPLICATION_CREATED, + createdBy: '127.0.0.1', + data: { + clientIp: '127.0.0.1', + appName: 'test3', + }, + tags: [{ type: 'simple', value: 'mytest' }], + }; + let seen = 0; + eventStore.on(APPLICATION_CREATED, () => seen++); + await eventStore.batchStore([event1, event2, event3]); + await clock.tickAsync(100); + t.is(seen, 3); + clock.restore(); +});