import csv import json import json_fix import xknx.remote_value import xknx from xknx.dpt import DPT2ByteFloat, DPTBinary, DPTTime, DPTValue1ByteUnsigned from datetime import datetime class DPST: def __init__(self, str): vals = str.split('-') self.major = vals[1] self.minor = vals[2] def __str__(self): return f"DPST {self.major}.{self.minor}" def __json__(self, **options): return self.__dict__ class Endpoint: def __init__(self, name, address, central, unfiltered, description, dpt, security, *args, **kwargs): self.name = name self.address = address self.central = central self.unfiltered = unfiltered self.description = description if self.is_endpoint(): self.dpt = DPST(dpt) else: self.dpt = None self.security = security def is_endpoint(self): return self.address[-1] != '-' def __str__(self): return f"name: {self.name}, address: {self.address}, dpt: {self.dpt}" def __json__(self, **options): return self.__dict__ def init_value_mapping(): config = {} config['1'] = { "value": DPTBinary, "type": "number" } config['9'] = { "value": DPT2ByteFloat, "type": "number" } config['10'] = { "value": DPTTime, "type": "time" } config['5'] = { "value": DPTValue1ByteUnsigned, "type": "number" } return config def load_structure(filename): print(f"Configuration file: {filename}") config = {} with open(filename, newline='') as csvFile: line = 0 reader = csv.reader(csvFile, delimiter=';', quotechar='"') for row in reader: line += 1 if line == 1: continue c = Endpoint(*row) if c.is_endpoint(): if c.dpt == '': print(f"Unknown DPT format for {c.address} ({c.name})") config[c.address] = c return config def parse_message(structure, mapping, telegram): if f"{telegram.destination_address}" not in structure: print(f"Unrecognised address {telegram.destination_address}. ignoring") return hit = structure[f"{telegram.destination_address}"] try: value = mapping[hit.dpt.major]["value"].from_knx(telegram.payload.value.value) # Convert the value to something else if needed # if mapping[hit.dpt.major]["type"] == "time": value = value.tm_hour*3600+value.tm_min*60 +value.tm_sec return { "timestamp": datetime.now().timestamp(), "value": value, "destination": f"{telegram.destination_address}", "name": hit.name, "description": hit.description, "type": f"{hit.dpt}" } except Exception as e: print(f"Unmapped {hit.dpt}") print (e) if __name__ == '__main__': from os.path import dirname, join from dotenv import load_dotenv dotenv_path = join(dirname(__file__), '.env') load_dotenv(dotenv_path) config = load_structure(os.environ.get('KNX_TOPOLOGY')) print(json.dumps(config["6/4/0"], indent=2))