blakeblackshear.frigate/frigate/stats.py
Nicolas Mowen 6c0978498d
Abstract MQTT from communication and make mqtt optional (#4462)
* Add option for mqtt config

* Setup communication layer

* Have a dispatcher which is responsible for handling and sending messages

* Move mqtt to communication

* Separate ws communications module

* Make ws client conform to communicator

* Cleanup imports

* Migrate to new dispatcher

* Clean up

* Need to set topic prefix

* Remove references to mqtt in dispatcher

* Don't start mqtt until dispatcher is subscribed

* Cleanup

* Shorten package

* Formatting

* Remove unused

* Cleanup

* Rename mqtt to ws on web

* Fix ws mypy

* Fix mypy

* Reformat

* Cleanup if/else chain

* Catch bad set commands
2022-11-23 20:03:20 -06:00

169 lines
5.2 KiB
Python

import json
import logging
import threading
import time
import psutil
import shutil
import os
import requests
from typing import Optional, Any
from multiprocessing.synchronize import Event as MpEvent
from frigate.comms.dispatcher import Dispatcher
from frigate.config import FrigateConfig
from frigate.const import RECORD_DIR, CLIPS_DIR, CACHE_DIR
from frigate.types import StatsTrackingTypes, CameraMetricsTypes
from frigate.version import VERSION
from frigate.util import get_cpu_stats
from frigate.object_detection import ObjectDetectProcess
logger = logging.getLogger(__name__)
def get_latest_version() -> str:
try:
request = requests.get(
"https://api.github.com/repos/blakeblackshear/frigate/releases/latest",
timeout=10,
)
except:
return "unknown"
response = request.json()
if request.ok and response and "tag_name" in response:
return str(response.get("tag_name").replace("v", ""))
else:
return "unknown"
def stats_init(
camera_metrics: dict[str, CameraMetricsTypes],
detectors: dict[str, ObjectDetectProcess],
) -> StatsTrackingTypes:
stats_tracking: StatsTrackingTypes = {
"camera_metrics": camera_metrics,
"detectors": detectors,
"started": int(time.time()),
"latest_frigate_version": get_latest_version(),
}
return stats_tracking
def get_fs_type(path: str) -> str:
bestMatch = ""
fsType = ""
for part in psutil.disk_partitions(all=True):
if path.startswith(part.mountpoint) and len(bestMatch) < len(part.mountpoint):
fsType = part.fstype
bestMatch = part.mountpoint
return fsType
def read_temperature(path: str) -> Optional[float]:
if os.path.isfile(path):
with open(path) as f:
line = f.readline().strip()
return int(line) / 1000
return None
def get_temperatures() -> dict[str, float]:
temps = {}
# Get temperatures for all attached Corals
base = "/sys/class/apex/"
if os.path.isdir(base):
for apex in os.listdir(base):
temp = read_temperature(os.path.join(base, apex, "temp"))
if temp is not None:
temps[apex] = temp
return temps
def stats_snapshot(stats_tracking: StatsTrackingTypes) -> dict[str, Any]:
camera_metrics = stats_tracking["camera_metrics"]
stats: dict[str, Any] = {}
total_detection_fps = 0
for name, camera_stats in camera_metrics.items():
total_detection_fps += camera_stats["detection_fps"].value
pid = camera_stats["process"].pid if camera_stats["process"] else None
ffmpeg_pid = (
camera_stats["ffmpeg_pid"].value if camera_stats["ffmpeg_pid"] else None
)
cpid = (
camera_stats["capture_process"].pid
if camera_stats["capture_process"]
else None
)
stats[name] = {
"camera_fps": round(camera_stats["camera_fps"].value, 2),
"process_fps": round(camera_stats["process_fps"].value, 2),
"skipped_fps": round(camera_stats["skipped_fps"].value, 2),
"detection_fps": round(camera_stats["detection_fps"].value, 2),
"pid": pid,
"capture_pid": cpid,
"ffmpeg_pid": ffmpeg_pid,
}
stats["detectors"] = {}
for name, detector in stats_tracking["detectors"].items():
pid = detector.detect_process.pid if detector.detect_process else None
stats["detectors"][name] = {
"inference_speed": round(detector.avg_inference_speed.value * 1000, 2),
"detection_start": detector.detection_start.value,
"pid": pid,
}
stats["detection_fps"] = round(total_detection_fps, 2)
stats["cpu_usages"] = get_cpu_stats()
stats["service"] = {
"uptime": (int(time.time()) - stats_tracking["started"]),
"version": VERSION,
"latest_version": stats_tracking["latest_frigate_version"],
"storage": {},
"temperatures": get_temperatures(),
}
for path in [RECORD_DIR, CLIPS_DIR, CACHE_DIR, "/dev/shm"]:
storage_stats = shutil.disk_usage(path)
stats["service"]["storage"][path] = {
"total": round(storage_stats.total / 1000000, 1),
"used": round(storage_stats.used / 1000000, 1),
"free": round(storage_stats.free / 1000000, 1),
"mount_type": get_fs_type(path),
}
return stats
class StatsEmitter(threading.Thread):
def __init__(
self,
config: FrigateConfig,
stats_tracking: StatsTrackingTypes,
dispatcher: Dispatcher,
topic_prefix: str,
stop_event: MpEvent,
):
threading.Thread.__init__(self)
self.name = "frigate_stats_emitter"
self.config = config
self.stats_tracking = stats_tracking
self.dispatcher = dispatcher
self.topic_prefix = topic_prefix
self.stop_event = stop_event
def run(self) -> None:
time.sleep(10)
while not self.stop_event.wait(self.config.mqtt.stats_interval):
stats = stats_snapshot(self.stats_tracking)
self.dispatcher.publish(
f"{self.topic_prefix}/stats", json.dumps(stats), retain=False
)
logger.info(f"Exiting watchdog...")