From 7b2b01a3121c02866121f826e57e92f63ec78755 Mon Sep 17 00:00:00 2001 From: Laur Ivan Date: Wed, 16 Mar 2022 15:07:48 +0100 Subject: [PATCH] Upload to DB works. --- .vscode/settings.json | 3 +- src/definitions.ts | 21 ++++++++++++ src/index.ts | 24 ++++++++----- src/processTelegram.ts | 77 +++++++++++++++++++++++++++++++++++++----- 4 files changed, 106 insertions(+), 19 deletions(-) create mode 100644 src/definitions.ts diff --git a/.vscode/settings.json b/.vscode/settings.json index 508eee3..b16bc7d 100644 --- a/.vscode/settings.json +++ b/.vscode/settings.json @@ -1,6 +1,7 @@ { "cSpell.words": [ "appwrite", - "loglevel" + "loglevel", + "upsert" ] } \ No newline at end of file diff --git a/src/definitions.ts b/src/definitions.ts new file mode 100644 index 0000000..52043fd --- /dev/null +++ b/src/definitions.ts @@ -0,0 +1,21 @@ +import { DPT } from 'dptlib/lib/definitions' + +/** + * All metrics have the same structure. + * The only difference id the type of data accepted + */ +export interface MetricDocument { + address_id: string + dpt: string + timestamp: number + value: number | boolean | string +} + +/** + * + */ +export interface DPTAndAddressID { + dpt: DPT + id: string + addressId: string +} \ No newline at end of file diff --git a/src/index.ts b/src/index.ts index c67b547..6dd527f 100644 --- a/src/index.ts +++ b/src/index.ts @@ -9,8 +9,9 @@ import knx from 'knx'; import { database } from './appwrite' import dotenv from 'dotenv' import { DPT } from 'dptlib/lib/definitions'; -import { getDPTforAddress } from './processTelegram'; -import { mainModule } from 'process'; +import { getDPTforAddress, upsert } from './processTelegram'; +import { MetricDocument } from './definitions' +import { DPTAndAddressID } from './definitions'; // read the addresses and DPTs. // let addresses = readAddresses("") @@ -21,7 +22,7 @@ dotenv.config({ override: false }) // TODO: Automatic load addresses to DB async function verify() { - getDPTforAddress({ dest_addr: '3/5/6' }) + getDPTforAddress('3/5/6') } verify().then(() => console.log("---")).catch(e => console.log('Error: ', e)) @@ -30,11 +31,20 @@ verify().then(() => console.log("---")).catch(e => console.log('Error: ', e)) async function main(evt: string, src: string, dest: string, value: Buffer) { - const dpt: DPT = (await getDPTforAddress({ dest_addr: dest })).dpt + const dpt: DPTAndAddressID = (await getDPTforAddress(dest)) + + const document: MetricDocument = { + address_id: dpt.addressId, + dpt: dpt.id, + timestamp: new Date().getTime(), + value: dpt.dpt.decoder(value) + } + + upsert(document) console.log("%s **** KNX EVENT: %j, src: %j, dest: %j, value: %j (DPT: %j}", new Date().toISOString().replace(/T/, ' ').replace(/\..+/, ''), - evt, src, dest, dpt.decoder(value)); + evt, src, dest, document); } // Create the connection @@ -48,11 +58,7 @@ const connection = new knx.Connection({ console.log('Connected!'); }, event: function (evt: string, src: string, dest: string, value: Buffer) { - main(evt, src, dest, value).then(() => { }).catch(e => console.log('Error: ', e)) - - - //readMessage(value,new Date(), ) } } }); \ No newline at end of file diff --git a/src/processTelegram.ts b/src/processTelegram.ts index d8d58a5..08f5fa3 100644 --- a/src/processTelegram.ts +++ b/src/processTelegram.ts @@ -3,6 +3,11 @@ import { Client, Database, Query } from 'node-appwrite' import { DataPointType } from 'dptlib/lib/DataPointType' import { DPT } from 'dptlib/lib/definitions' import { database } from './appwrite' +import type { Models } from 'node-appwrite' +import { DPTAndAddressID, MetricDocument } from './definitions' + + +var logger = require('log-driver').logger; dotenv.config({ override: false }) @@ -13,11 +18,15 @@ const dptMap: { [index: string]: string } = { 'DPT9.1': '61f46c7fc8869fa2c6a1' // temperature } -// TODO: add types -export async function getDPTforAddress(record: any): Promise { +/** + * + * @param destinationAddress the KNX address for which we want the DPT + * @returns a @type{DPTAndAddressID} containing the DPT object and the appwrite id of the address + */ +export async function getDPTforAddress(destinationAddress: string): Promise { let list = await database.listDocuments(process.env.APPWRITE_KNX_ADDRESS_COLLECTION, [ Query.equal('project_id', process.env.APPWRITE_CONFIGURATION_ID), - Query.equal('address', record.dest_addr) + Query.equal('address', destinationAddress) ]); // Throw an error if the address is not unique @@ -29,13 +38,63 @@ export async function getDPTforAddress(record: any): Promise { // TODO: Move thematcher into the DPT lib as a static function const m = ((list.documents[0] as any).dpt).toUpperCase().match(/(?:DPT)?(\d+)(\.(\d+))?/); - record.dpt = DataPointType.TYPES[`DPT${m[1]}`] - record.addressId = list.documents[0]['$id'] - return record + const result: DPTAndAddressID = { + dpt: DataPointType.TYPES[`DPT${m[1]}`], + id: m[0], + addressId: list.documents[0]['$id'] + } + return result } +interface SumListDocuments { + sum: number +} + +export async function upsert(record: MetricDocument): Promise { + + let collectionId = (record.dpt in dptMap) ? dptMap[record.dpt] : undefined + + // Soft return in case of invalid processor + // + if (collectionId === undefined) { + logger.debug('Cannot process [%s] with value %j', record.dpt, record) + return; + } + + // Upsert + const list_items = await database.listDocuments(collectionId, [ + Query.equal('dpt', record.dpt), + Query.equal('timestamp', record.timestamp) + ]) + + const count = ((list_items as unknown) as SumListDocuments).sum + + if (count === 0) { + logger.debug('insert') + await database.createDocument(collectionId, 'unique()', record); + } else if (count === 1) { + var doc = list_items.documents[0] as any + if ( + doc.address_id !== record.address_id && + doc.dpt !== record.dpt && + doc.timestamp !== record.timestamp && + doc.value !== record.value + ) { + logger.log('update') + await database.updateDocument(collectionId, doc['$id'], record); + } else { + logger.debug('Skip %s', doc['$id']) + } + + } else { + logger.error('%j', list_items) + throw 'Multiple entries!' + } +} + + // TODO: Add types -export async function upsert(entry: any) { +export async function upsert1(entry: any) { let dptId: string = entry.dpt.id + '.' + entry.dpt.subtypeid let collectionId = (dptId in dptMap) ? dptMap[dptId] : undefined @@ -50,11 +109,11 @@ export async function upsert(entry: any) { Query.equal('timestamp', entry.timestamp.getTime()) ]) - let data = { + let data: MetricDocument = { address_id: entry.addressId, dpt: dptId, timestamp: entry.timestamp.getTime(), - value: entry.apdu.dptData + value: entry.apdu.dptData, } if (list_items.total === 0) {