blakeblackshear.frigate/frigate/stats/emitter.py

91 lines
2.5 KiB
Python
Raw Normal View History

"""Emit stats to listeners."""
import itertools
import json
import logging
import threading
import time
from multiprocessing.synchronize import Event as MpEvent
from typing import Optional
from frigate.comms.inter_process import InterProcessRequestor
from frigate.config import FrigateConfig
from frigate.stats.util import stats_snapshot
from frigate.types import StatsTrackingTypes
logger = logging.getLogger(__name__)
MAX_STATS_POINTS = 120
class StatsEmitter(threading.Thread):
def __init__(
self,
config: FrigateConfig,
stats_tracking: StatsTrackingTypes,
stop_event: MpEvent,
):
threading.Thread.__init__(self)
self.name = "frigate_stats_emitter"
self.config = config
self.stats_tracking = stats_tracking
self.stop_event = stop_event
self.hwaccel_errors: list[str] = []
self.stats_history: list[dict[str, any]] = []
# create communication for stats
self.requestor = InterProcessRequestor()
def get_latest_stats(self) -> dict[str, any]:
"""Get latest stats."""
if len(self.stats_history) > 0:
return self.stats_history[-1]
else:
stats = stats_snapshot(
self.config, self.stats_tracking, self.hwaccel_errors
)
self.stats_history.append(stats)
return stats
def get_stats_history(
self, keys: Optional[list[str]] = None
) -> list[dict[str, any]]:
"""Get stats history."""
if not keys:
return self.stats_history
selected_stats: list[dict[str, any]] = []
for s in self.stats_history:
selected = {}
for k in keys:
selected[k] = s.get(k)
selected_stats.append(selected)
return selected_stats
def run(self) -> None:
time.sleep(10)
for counter in itertools.cycle(
range(int(self.config.mqtt.stats_interval / 10))
):
if self.stop_event.wait(10):
break
logger.debug("Starting stats collection")
stats = stats_snapshot(
self.config, self.stats_tracking, self.hwaccel_errors
)
self.stats_history.append(stats)
self.stats_history = self.stats_history[-MAX_STATS_POINTS:]
if counter == 0:
self.requestor.send_data("stats", json.dumps(stats))
logger.debug("Finished stats collection")
logger.info("Exiting stats emitter...")