import json import logging import threading import time from frigate.config import FrigateConfig from frigate.version import VERSION logger = logging.getLogger(__name__) def stats_init(camera_metrics, detectors): stats_tracking = { 'camera_metrics': camera_metrics, 'detectors': detectors, 'started': int(time.time()) } return stats_tracking def stats_snapshot(stats_tracking): camera_metrics = stats_tracking['camera_metrics'] stats = {} total_detection_fps = 0 for name, camera_stats in camera_metrics.items(): total_detection_fps += camera_stats['detection_fps'].value stats[name] = { 'camera_fps': round(camera_stats['camera_fps'].value, 2), 'process_fps': round(camera_stats['process_fps'].value, 2), 'skipped_fps': round(camera_stats['skipped_fps'].value, 2), 'detection_fps': round(camera_stats['detection_fps'].value, 2), 'pid': camera_stats['process'].pid, 'capture_pid': camera_stats['capture_process'].pid } stats['detectors'] = {} for name, detector in stats_tracking["detectors"].items(): stats['detectors'][name] = { 'inference_speed': round(detector.avg_inference_speed.value * 1000, 2), 'detection_start': detector.detection_start.value, 'pid': detector.detect_process.pid } stats['detection_fps'] = round(total_detection_fps, 2) stats['service'] = { 'uptime': (int(time.time()) - stats_tracking['started']), 'version': VERSION } return stats class StatsEmitter(threading.Thread): def __init__(self, config: FrigateConfig, stats_tracking, mqtt_client, topic_prefix, stop_event): threading.Thread.__init__(self) self.name = 'frigate_stats_emitter' self.config = config self.stats_tracking = stats_tracking self.mqtt_client = mqtt_client self.topic_prefix = topic_prefix self.stop_event = stop_event def run(self): time.sleep(10) while True: if self.stop_event.is_set(): logger.info(f"Exiting watchdog...") break stats = stats_snapshot(self.stats_tracking) self.mqtt_client.publish(f"{self.topic_prefix}/stats", json.dumps(stats), retain=False) time.sleep(self.config.mqtt.stats_interval)