Upload to DB works.

This commit is contained in:
Laur Ivan 2022-03-16 15:07:48 +01:00
parent 08676075f8
commit 7b2b01a312
4 changed files with 106 additions and 19 deletions

View File

@ -1,6 +1,7 @@
{ {
"cSpell.words": [ "cSpell.words": [
"appwrite", "appwrite",
"loglevel" "loglevel",
"upsert"
] ]
} }

21
src/definitions.ts Normal file
View File

@ -0,0 +1,21 @@
import { DPT } from 'dptlib/lib/definitions'
/**
* All metrics have the same structure.
* The only difference id the type of data accepted
*/
export interface MetricDocument {
address_id: string
dpt: string
timestamp: number
value: number | boolean | string
}
/**
*
*/
export interface DPTAndAddressID {
dpt: DPT
id: string
addressId: string
}

View File

@ -9,8 +9,9 @@ import knx from 'knx';
import { database } from './appwrite' import { database } from './appwrite'
import dotenv from 'dotenv' import dotenv from 'dotenv'
import { DPT } from 'dptlib/lib/definitions'; import { DPT } from 'dptlib/lib/definitions';
import { getDPTforAddress } from './processTelegram'; import { getDPTforAddress, upsert } from './processTelegram';
import { mainModule } from 'process'; import { MetricDocument } from './definitions'
import { DPTAndAddressID } from './definitions';
// read the addresses and DPTs. // read the addresses and DPTs.
// let addresses = readAddresses("") // let addresses = readAddresses("")
@ -21,7 +22,7 @@ dotenv.config({ override: false })
// TODO: Automatic load addresses to DB // TODO: Automatic load addresses to DB
async function verify() { async function verify() {
getDPTforAddress({ dest_addr: '3/5/6' }) getDPTforAddress('3/5/6')
} }
verify().then(() => console.log("---")).catch(e => console.log('Error: ', e)) verify().then(() => console.log("---")).catch(e => console.log('Error: ', e))
@ -30,11 +31,20 @@ verify().then(() => console.log("---")).catch(e => console.log('Error: ', e))
async function main(evt: string, src: string, dest: string, value: Buffer) { async function main(evt: string, src: string, dest: string, value: Buffer) {
const dpt: DPT = (await getDPTforAddress({ dest_addr: dest })).dpt const dpt: DPTAndAddressID = (await getDPTforAddress(dest))
const document: MetricDocument = {
address_id: dpt.addressId,
dpt: dpt.id,
timestamp: new Date().getTime(),
value: dpt.dpt.decoder(value)
}
upsert(document)
console.log("%s **** KNX EVENT: %j, src: %j, dest: %j, value: %j (DPT: %j}", console.log("%s **** KNX EVENT: %j, src: %j, dest: %j, value: %j (DPT: %j}",
new Date().toISOString().replace(/T/, ' ').replace(/\..+/, ''), new Date().toISOString().replace(/T/, ' ').replace(/\..+/, ''),
evt, src, dest, dpt.decoder(value)); evt, src, dest, document);
} }
// Create the connection // Create the connection
@ -48,11 +58,7 @@ const connection = new knx.Connection({
console.log('Connected!'); console.log('Connected!');
}, },
event: function (evt: string, src: string, dest: string, value: Buffer) { event: function (evt: string, src: string, dest: string, value: Buffer) {
main(evt, src, dest, value).then(() => { }).catch(e => console.log('Error: ', e)) main(evt, src, dest, value).then(() => { }).catch(e => console.log('Error: ', e))
//readMessage(value,new Date(), )
} }
} }
}); });

View File

@ -3,6 +3,11 @@ import { Client, Database, Query } from 'node-appwrite'
import { DataPointType } from 'dptlib/lib/DataPointType' import { DataPointType } from 'dptlib/lib/DataPointType'
import { DPT } from 'dptlib/lib/definitions' import { DPT } from 'dptlib/lib/definitions'
import { database } from './appwrite' import { database } from './appwrite'
import type { Models } from 'node-appwrite'
import { DPTAndAddressID, MetricDocument } from './definitions'
var logger = require('log-driver').logger;
dotenv.config({ override: false }) dotenv.config({ override: false })
@ -13,11 +18,15 @@ const dptMap: { [index: string]: string } = {
'DPT9.1': '61f46c7fc8869fa2c6a1' // temperature 'DPT9.1': '61f46c7fc8869fa2c6a1' // temperature
} }
// TODO: add types /**
export async function getDPTforAddress(record: any): Promise<any> { *
* @param destinationAddress the KNX address for which we want the DPT
* @returns a @type{DPTAndAddressID} containing the DPT object and the appwrite id of the address
*/
export async function getDPTforAddress(destinationAddress: string): Promise<DPTAndAddressID> {
let list = await database.listDocuments(process.env.APPWRITE_KNX_ADDRESS_COLLECTION, [ let list = await database.listDocuments(process.env.APPWRITE_KNX_ADDRESS_COLLECTION, [
Query.equal('project_id', process.env.APPWRITE_CONFIGURATION_ID), Query.equal('project_id', process.env.APPWRITE_CONFIGURATION_ID),
Query.equal('address', record.dest_addr) Query.equal('address', destinationAddress)
]); ]);
// Throw an error if the address is not unique // Throw an error if the address is not unique
@ -29,13 +38,63 @@ export async function getDPTforAddress(record: any): Promise<any> {
// TODO: Move thematcher into the DPT lib as a static function // TODO: Move thematcher into the DPT lib as a static function
const m = ((list.documents[0] as any).dpt).toUpperCase().match(/(?:DPT)?(\d+)(\.(\d+))?/); const m = ((list.documents[0] as any).dpt).toUpperCase().match(/(?:DPT)?(\d+)(\.(\d+))?/);
record.dpt = DataPointType.TYPES[`DPT${m[1]}`] const result: DPTAndAddressID = {
record.addressId = list.documents[0]['$id'] dpt: DataPointType.TYPES[`DPT${m[1]}`],
return record id: m[0],
addressId: list.documents[0]['$id']
}
return result
} }
interface SumListDocuments {
sum: number
}
export async function upsert(record: MetricDocument): Promise<any> {
let collectionId = (record.dpt in dptMap) ? dptMap[record.dpt] : undefined
// Soft return in case of invalid processor
//
if (collectionId === undefined) {
logger.debug('Cannot process [%s] with value %j', record.dpt, record)
return;
}
// Upsert
const list_items = await database.listDocuments(collectionId, [
Query.equal('dpt', record.dpt),
Query.equal('timestamp', record.timestamp)
])
const count = ((list_items as unknown) as SumListDocuments).sum
if (count === 0) {
logger.debug('insert')
await database.createDocument(collectionId, 'unique()', record);
} else if (count === 1) {
var doc = list_items.documents[0] as any
if (
doc.address_id !== record.address_id &&
doc.dpt !== record.dpt &&
doc.timestamp !== record.timestamp &&
doc.value !== record.value
) {
logger.log('update')
await database.updateDocument(collectionId, doc['$id'], record);
} else {
logger.debug('Skip %s', doc['$id'])
}
} else {
logger.error('%j', list_items)
throw 'Multiple entries!'
}
}
// TODO: Add types // TODO: Add types
export async function upsert(entry: any) { export async function upsert1(entry: any) {
let dptId: string = entry.dpt.id + '.' + entry.dpt.subtypeid let dptId: string = entry.dpt.id + '.' + entry.dpt.subtypeid
let collectionId = (dptId in dptMap) ? dptMap[dptId] : undefined let collectionId = (dptId in dptMap) ? dptMap[dptId] : undefined
@ -50,11 +109,11 @@ export async function upsert(entry: any) {
Query.equal('timestamp', entry.timestamp.getTime()) Query.equal('timestamp', entry.timestamp.getTime())
]) ])
let data = { let data: MetricDocument = {
address_id: entry.addressId, address_id: entry.addressId,
dpt: dptId, dpt: dptId,
timestamp: entry.timestamp.getTime(), timestamp: entry.timestamp.getTime(),
value: entry.apdu.dptData value: entry.apdu.dptData,
} }
if (list_items.total === 0) { if (list_items.total === 0) {