mirror of
https://github.com/Unleash/unleash.git
synced 2025-04-06 01:15:28 +02:00
parent
31082e5227
commit
5c795bdaa7
@ -12,6 +12,8 @@ const EVENT_COLUMNS = [
|
|||||||
'tags',
|
'tags',
|
||||||
];
|
];
|
||||||
|
|
||||||
|
const TABLE = 'events';
|
||||||
|
|
||||||
class EventStore extends EventEmitter {
|
class EventStore extends EventEmitter {
|
||||||
constructor(db, getLogger) {
|
constructor(db, getLogger) {
|
||||||
super();
|
super();
|
||||||
@ -21,13 +23,16 @@ class EventStore extends EventEmitter {
|
|||||||
|
|
||||||
async store(event) {
|
async store(event) {
|
||||||
try {
|
try {
|
||||||
await this.db('events').insert({
|
const rows = await this.db(TABLE)
|
||||||
|
.insert({
|
||||||
type: event.type,
|
type: event.type,
|
||||||
created_by: event.createdBy, // eslint-disable-line
|
created_by: event.createdBy, // eslint-disable-line
|
||||||
data: event.data,
|
data: event.data,
|
||||||
tags: event.tags ? JSON.stringify(event.tags) : [],
|
tags: event.tags ? JSON.stringify(event.tags) : [],
|
||||||
});
|
})
|
||||||
process.nextTick(() => this.emit(event.type, event));
|
.returning(EVENT_COLUMNS);
|
||||||
|
const savedEvent = this.rowToEvent(rows[0]);
|
||||||
|
process.nextTick(() => this.emit(event.type, savedEvent));
|
||||||
} catch (e) {
|
} catch (e) {
|
||||||
this.logger.warn(`Failed to store event ${e}`);
|
this.logger.warn(`Failed to store event ${e}`);
|
||||||
}
|
}
|
||||||
@ -35,27 +40,23 @@ class EventStore extends EventEmitter {
|
|||||||
|
|
||||||
async batchStore(events) {
|
async batchStore(events) {
|
||||||
try {
|
try {
|
||||||
await this.db('events').insert(events.map(this.eventToDbRow));
|
const savedRows = await this.db(TABLE)
|
||||||
process.nextTick(() => events.forEach(e => this.emit(e.type, e)));
|
.insert(events.map(this.eventToDbRow))
|
||||||
|
.returning(EVENT_COLUMNS);
|
||||||
|
const savedEvents = savedRows.map(this.rowToEvent);
|
||||||
|
process.nextTick(() =>
|
||||||
|
savedEvents.forEach(e => this.emit(e.type, e)),
|
||||||
|
);
|
||||||
} catch (e) {
|
} catch (e) {
|
||||||
this.logger.warn('Failed to store events');
|
this.logger.warn('Failed to store events');
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
eventToDbRow(e) {
|
|
||||||
return {
|
|
||||||
type: e.type,
|
|
||||||
created_by: e.createdBy,
|
|
||||||
data: e.data,
|
|
||||||
tags: e.tags ? JSON.stringify(e.tags) : [],
|
|
||||||
};
|
|
||||||
}
|
|
||||||
|
|
||||||
async getEvents() {
|
async getEvents() {
|
||||||
try {
|
try {
|
||||||
const rows = await this.db
|
const rows = await this.db
|
||||||
.select(EVENT_COLUMNS)
|
.select(EVENT_COLUMNS)
|
||||||
.from('events')
|
.from(TABLE)
|
||||||
.limit(100)
|
.limit(100)
|
||||||
.orderBy('created_at', 'desc');
|
.orderBy('created_at', 'desc');
|
||||||
|
|
||||||
@ -69,7 +70,7 @@ class EventStore extends EventEmitter {
|
|||||||
try {
|
try {
|
||||||
const rows = await this.db
|
const rows = await this.db
|
||||||
.select(EVENT_COLUMNS)
|
.select(EVENT_COLUMNS)
|
||||||
.from('events')
|
.from(TABLE)
|
||||||
.limit(100)
|
.limit(100)
|
||||||
.whereRaw("data ->> 'name' = ?", [name])
|
.whereRaw("data ->> 'name' = ?", [name])
|
||||||
.andWhere(
|
.andWhere(
|
||||||
@ -77,7 +78,7 @@ class EventStore extends EventEmitter {
|
|||||||
'>=',
|
'>=',
|
||||||
this.db
|
this.db
|
||||||
.select(this.db.raw('coalesce(max(id),0) as id'))
|
.select(this.db.raw('coalesce(max(id),0) as id'))
|
||||||
.from('events')
|
.from(TABLE)
|
||||||
.where({ type: DROP_FEATURES }),
|
.where({ type: DROP_FEATURES }),
|
||||||
)
|
)
|
||||||
.orderBy('created_at', 'desc');
|
.orderBy('created_at', 'desc');
|
||||||
@ -97,6 +98,15 @@ class EventStore extends EventEmitter {
|
|||||||
tags: row.tags,
|
tags: row.tags,
|
||||||
};
|
};
|
||||||
}
|
}
|
||||||
|
|
||||||
|
eventToDbRow(e) {
|
||||||
|
return {
|
||||||
|
type: e.type,
|
||||||
|
created_by: e.createdBy,
|
||||||
|
data: e.data,
|
||||||
|
tags: e.tags ? JSON.stringify(e.tags) : [],
|
||||||
|
};
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
module.exports = EventStore;
|
module.exports = EventStore;
|
||||||
|
@ -19,6 +19,26 @@ test.before(async () => {
|
|||||||
test.after(async () => {
|
test.after(async () => {
|
||||||
await stores.db.destroy();
|
await stores.db.destroy();
|
||||||
});
|
});
|
||||||
|
test.serial('Should include id and createdAt when saving', async t => {
|
||||||
|
const clock = sinon.useFakeTimers();
|
||||||
|
const event1 = {
|
||||||
|
type: APPLICATION_CREATED,
|
||||||
|
createdBy: '127.0.0.1',
|
||||||
|
data: {
|
||||||
|
clientIp: '127.0.0.1',
|
||||||
|
appName: 'test1',
|
||||||
|
},
|
||||||
|
};
|
||||||
|
const seen = [];
|
||||||
|
eventStore.on(APPLICATION_CREATED, e => seen.push(e));
|
||||||
|
await eventStore.store(event1);
|
||||||
|
await clock.tickAsync(100);
|
||||||
|
t.is(seen.length, 1);
|
||||||
|
t.truthy(seen[0].id);
|
||||||
|
t.truthy(seen[0].createdAt);
|
||||||
|
t.is(seen[0].data.clientIp, event1.data.clientIp);
|
||||||
|
t.is(seen[0].data.appName, event1.data.appName);
|
||||||
|
});
|
||||||
|
|
||||||
test.serial('Should be able to store multiple events at once', async t => {
|
test.serial('Should be able to store multiple events at once', async t => {
|
||||||
const clock = sinon.useFakeTimers();
|
const clock = sinon.useFakeTimers();
|
||||||
@ -47,10 +67,14 @@ test.serial('Should be able to store multiple events at once', async t => {
|
|||||||
},
|
},
|
||||||
tags: [{ type: 'simple', value: 'mytest' }],
|
tags: [{ type: 'simple', value: 'mytest' }],
|
||||||
};
|
};
|
||||||
let seen = 0;
|
const seen = [];
|
||||||
eventStore.on(APPLICATION_CREATED, () => seen++);
|
eventStore.on(APPLICATION_CREATED, e => seen.push(e));
|
||||||
await eventStore.batchStore([event1, event2, event3]);
|
await eventStore.batchStore([event1, event2, event3]);
|
||||||
await clock.tickAsync(100);
|
await clock.tickAsync(100);
|
||||||
t.is(seen, 3);
|
t.is(seen.length, 3);
|
||||||
|
seen.forEach(e => {
|
||||||
|
t.truthy(e.id);
|
||||||
|
t.truthy(e.createdAt);
|
||||||
|
});
|
||||||
clock.restore();
|
clock.restore();
|
||||||
});
|
});
|
||||||
|
Loading…
Reference in New Issue
Block a user