"""Emit stats to listeners.""" import itertools import json import logging import threading import time from multiprocessing.synchronize import Event as MpEvent from typing import Optional from frigate.comms.inter_process import InterProcessRequestor from frigate.config import FrigateConfig from frigate.stats.util import stats_snapshot from frigate.types import StatsTrackingTypes logger = logging.getLogger(__name__) MAX_STATS_POINTS = 120 class StatsEmitter(threading.Thread): def __init__( self, config: FrigateConfig, stats_tracking: StatsTrackingTypes, stop_event: MpEvent, ): threading.Thread.__init__(self) self.name = "frigate_stats_emitter" self.config = config self.stats_tracking = stats_tracking self.stop_event = stop_event self.hwaccel_errors: list[str] = [] self.stats_history: list[dict[str, any]] = [] # create communication for stats self.requestor = InterProcessRequestor() def get_latest_stats(self) -> dict[str, any]: """Get latest stats.""" if len(self.stats_history) > 0: return self.stats_history[-1] else: stats = stats_snapshot( self.config, self.stats_tracking, self.hwaccel_errors ) self.stats_history.append(stats) return stats def get_stats_history( self, keys: Optional[list[str]] = None ) -> list[dict[str, any]]: """Get stats history.""" if not keys: return self.stats_history selected_stats: list[dict[str, any]] = [] for s in self.stats_history: selected = {} for k in keys: selected[k] = s.get(k) selected_stats.append(selected) return selected_stats def run(self) -> None: time.sleep(10) for counter in itertools.cycle( range(int(self.config.mqtt.stats_interval / 10)) ): if self.stop_event.wait(10): break logger.debug("Starting stats collection") stats = stats_snapshot( self.config, self.stats_tracking, self.hwaccel_errors ) self.stats_history.append(stats) self.stats_history = self.stats_history[-MAX_STATS_POINTS:] if counter == 0: self.requestor.send_data("stats", json.dumps(stats)) logger.debug("Finished stats collection") logger.info("Exiting stats emitter...")