blakeblackshear.frigate/frigate/stats/emitter.py
Martin Weinelt 4d4d54d030
Fix various typing issues (#18187)
* Fix the `Any` typing hint treewide

There has been confusion between the Any type[1] and the any function[2]
in typing hints.

[1] https://docs.python.org/3/library/typing.html#typing.Any
[2] https://docs.python.org/3/library/functions.html#any

* Fix typing for various frame_shape members

Frame shapes are most likely defined by height and width, so a single int
cannot express that.

* Wrap gpu stats functions in Optional[]

These can return `None`, so they need to be `Type | None`, which is what
`Optional` expresses very nicely.

* Fix return type in get_latest_segment_datetime

Returns a datetime object, not an integer.

* Make the return type of FrameManager.write optional

This is necessary since the SharedMemoryFrameManager.write function can
return None.

* Fix total_seconds() return type in get_tz_modifiers

The function returns a float, not an int.

https://docs.python.org/3/library/datetime.html#datetime.timedelta.total_seconds

* Account for floating point results in to_relative_box

Because the function uses division the return types may either be int or
float.

* Resolve ruff deprecation warning

The config has been split into formatter and linter, and the global
options are deprecated.
2025-05-13 08:27:20 -06:00

102 lines
2.9 KiB
Python

"""Emit stats to listeners."""
import itertools
import json
import logging
import threading
import time
from multiprocessing.synchronize import Event as MpEvent
from typing import Any, Optional
from frigate.comms.inter_process import InterProcessRequestor
from frigate.config import FrigateConfig
from frigate.const import FREQUENCY_STATS_POINTS
from frigate.stats.prometheus import update_metrics
from frigate.stats.util import stats_snapshot
from frigate.types import StatsTrackingTypes
logger = logging.getLogger(__name__)
MAX_STATS_POINTS = 80
class StatsEmitter(threading.Thread):
def __init__(
self,
config: FrigateConfig,
stats_tracking: StatsTrackingTypes,
stop_event: MpEvent,
):
super().__init__(name="frigate_stats_emitter")
self.config = config
self.stats_tracking = stats_tracking
self.stop_event = stop_event
self.hwaccel_errors: list[str] = []
self.stats_history: list[dict[str, Any]] = []
# create communication for stats
self.requestor = InterProcessRequestor()
def get_latest_stats(self) -> dict[str, Any]:
"""Get latest stats."""
if len(self.stats_history) > 0:
return self.stats_history[-1]
else:
stats = stats_snapshot(
self.config, self.stats_tracking, self.hwaccel_errors
)
self.stats_history.append(stats)
return stats
def get_stats_history(
self, keys: Optional[list[str]] = None
) -> list[dict[str, Any]]:
"""Get stats history."""
if not keys:
return self.stats_history
selected_stats: list[dict[str, Any]] = []
for s in self.stats_history:
selected = {}
for k in keys:
selected[k] = s.get(k)
selected_stats.append(selected)
return selected_stats
def stats_init(config, camera_metrics, detectors, processes):
stats = {
"cameras": camera_metrics,
"detectors": detectors,
"processes": processes,
}
# Update Prometheus metrics with initial stats
update_metrics(stats)
return stats
def run(self) -> None:
time.sleep(10)
for counter in itertools.cycle(
range(int(self.config.mqtt.stats_interval / FREQUENCY_STATS_POINTS))
):
if self.stop_event.wait(FREQUENCY_STATS_POINTS):
break
logger.debug("Starting stats collection")
stats = stats_snapshot(
self.config, self.stats_tracking, self.hwaccel_errors
)
self.stats_history.append(stats)
self.stats_history = self.stats_history[-MAX_STATS_POINTS:]
if counter == 0:
self.requestor.send_data("stats", json.dumps(stats))
logger.debug("Finished stats collection")
logger.info("Exiting stats emitter...")