blakeblackshear.frigate/frigate/stats/emitter.py

62 lines
2.0 KiB
Python
Raw Normal View History

"""Emit stats to listeners."""
import json
import logging
import threading
import time
from multiprocessing.synchronize import Event as MpEvent
from frigate.comms.inter_process import InterProcessRequestor
from frigate.config import FrigateConfig
from frigate.stats.util import stats_snapshot
from frigate.types import StatsTrackingTypes
logger = logging.getLogger(__name__)
class StatsEmitter(threading.Thread):
def __init__(
self,
config: FrigateConfig,
stats_tracking: StatsTrackingTypes,
stop_event: MpEvent,
):
threading.Thread.__init__(self)
self.name = "frigate_stats_emitter"
self.config = config
self.stats_tracking = stats_tracking
self.stop_event = stop_event
self.hwaccel_errors: list[str] = []
self.stats_history: list[dict[str, any]] = []
# create communication for stats
self.requestor = InterProcessRequestor()
def get_latest_stats(self) -> dict[str, any]:
"""Get latest stats."""
if len(self.stats_history) > 0:
return self.stats_history[-1]
else:
stats = stats_snapshot(
self.config, self.stats_tracking, self.hwaccel_errors
)
self.stats_history.append(stats)
return stats
def get_stats_history(self) -> list[dict[str, any]]:
"""Get stats history."""
return self.stats_history
def run(self) -> None:
time.sleep(10)
while not self.stop_event.wait(self.config.mqtt.stats_interval):
logger.debug("Starting stats collection")
stats = stats_snapshot(
self.config, self.stats_tracking, self.hwaccel_errors
)
self.stats_history.append(stats)
self.stats_history = self.stats_history[-10:]
self.requestor.send_data("stats", json.dumps(stats))
logger.debug("Finished stats collection")
logger.info("Exiting stats emitter...")