knx-monitor/structure.py
2022-12-19 12:24:47 +01:00

106 lines
2.8 KiB
Python

import csv
import json
import json_fix
import xknx.remote_value
import xknx
from xknx.dpt import DPT2ByteFloat, DPTBinary, DPTTime, DPTValue1ByteUnsigned
from datetime import datetime
class DPST:
def __init__(self, str):
vals = str.split('-')
self.major = vals[1]
self.minor = vals[2]
def __str__(self):
return f"DPST {self.major}.{self.minor}"
def __json__(self, **options):
return self.__dict__
class Endpoint:
def __init__(self, name, address, central, unfiltered, description, dpt, security, *args, **kwargs):
self.name = name
self.address = address
self.central = central
self.unfiltered = unfiltered
self.description = description
if self.is_endpoint():
self.dpt = DPST(dpt)
else:
self.dpt = None
self.security = security
def is_endpoint(self):
return self.address[-1] != '-'
def __str__(self):
return f"name: {self.name}, address: {self.address}, dpt: {self.dpt}"
def __json__(self, **options):
return self.__dict__
def init_value_mapping():
config = {}
config['1'] = { "value": DPTBinary, "type": "number" }
config['9'] = { "value": DPT2ByteFloat, "type": "number" }
config['10'] = { "value": DPTTime, "type": "time" }
config['5'] = { "value": DPTValue1ByteUnsigned, "type": "number" }
return config
def load_structure(filename):
config = {}
with open(filename, newline='') as csvFile:
line = 0
reader = csv.reader(csvFile, delimiter=';', quotechar='"')
for row in reader:
line += 1
if line == 1:
continue
c = Endpoint(*row)
if c.is_endpoint():
if c.dpt == '':
print(f"Unknown DPT format for {c.address} ({c.name})")
config[c.address] = c
return config
def parse_message(structure, mapping, telegram):
if f"{telegram.destination_address}" not in structure:
print(f"Unrecognised address {telegram.destination_address}. ignoring")
return
hit = structure[f"{telegram.destination_address}"]
try:
value = mapping[hit.dpt.major]["value"].from_knx(telegram.payload.value.value)
# Convert the value to something else if needed
#
if mapping[hit.dpt.major]["type"] == "time":
value = value.tm_hour*3600+value.tm_min*60 +value.tm_sec
return {
"timestamp": datetime.now().timestamp(),
"value": value,
"destination": f"{telegram.destination_address}",
"name": hit.name,
"description": hit.description,
"type": f"{hit.dpt}"
}
except Exception as e:
print(f"Unmapped {hit.dpt}")
print (e)
if __name__ == '__main__':
from dotenv import load_dotenv
dotenv_path = join(dirname(__file__), '.env')
load_dotenv(dotenv_path)
config = load_structure(os.environ.get('KNX_TOPOLOGY'))
print(json.dumps(config["6/4/0"], indent=2))