Fix prometheus client exporter (#16620)

* wip

* wip

* put it back

* formatter

* Delete hailort.log

* Delete hailort.log

* lint

---------

Co-authored-by: Nicolas Mowen <nickmowen213@gmail.com>
This commit is contained in:
Mitch Ross 2025-02-17 08:17:15 -05:00 committed by GitHub
parent 349abc8d1b
commit 2020cdffd5
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
4 changed files with 486 additions and 193 deletions

View File

@ -38,4 +38,4 @@ services:
container_name: mqtt container_name: mqtt
image: eclipse-mosquitto:1.6 image: eclipse-mosquitto:1.6
ports: ports:
- "1883:1883" - "1883:1883"

View File

@ -68,3 +68,4 @@ netaddr==0.8.*
netifaces==0.10.* netifaces==0.10.*
verboselogs==1.7.* verboselogs==1.7.*
virtualenv==20.17.* virtualenv==20.17.*
prometheus-client == 0.21.*

View File

@ -20,7 +20,6 @@ from fastapi.params import Depends
from fastapi.responses import JSONResponse, PlainTextResponse, StreamingResponse from fastapi.responses import JSONResponse, PlainTextResponse, StreamingResponse
from markupsafe import escape from markupsafe import escape
from peewee import operator from peewee import operator
from prometheus_client import CONTENT_TYPE_LATEST, generate_latest
from pydantic import ValidationError from pydantic import ValidationError
from frigate.api.defs.query.app_query_parameters import AppTimelineHourlyQueryParameters from frigate.api.defs.query.app_query_parameters import AppTimelineHourlyQueryParameters
@ -28,6 +27,7 @@ from frigate.api.defs.request.app_body import AppConfigSetBody
from frigate.api.defs.tags import Tags from frigate.api.defs.tags import Tags
from frigate.config import FrigateConfig from frigate.config import FrigateConfig
from frigate.models import Event, Timeline from frigate.models import Event, Timeline
from frigate.stats.prometheus import get_metrics, update_metrics
from frigate.util.builtin import ( from frigate.util.builtin import (
clean_camera_user_pass, clean_camera_user_pass,
get_tz_modifiers, get_tz_modifiers,
@ -113,9 +113,13 @@ def stats_history(request: Request, keys: str = None):
@router.get("/metrics") @router.get("/metrics")
def metrics(): def metrics(request: Request):
"""Expose Prometheus metrics endpoint""" """Expose Prometheus metrics endpoint and update metrics with latest stats"""
return Response(content=generate_latest(), media_type=CONTENT_TYPE_LATEST) # Retrieve the latest statistics and update the Prometheus metrics
stats = request.app.stats_emitter.get_latest_stats()
update_metrics(stats)
content, content_type = get_metrics()
return Response(content=content, media_type=content_type)
@router.get("/config") @router.get("/config")

View File

@ -1,207 +1,495 @@
from typing import Dict import logging
import re
from prometheus_client import ( from prometheus_client import CONTENT_TYPE_LATEST, generate_latest
CONTENT_TYPE_LATEST, from prometheus_client.core import (
Counter, REGISTRY,
Gauge, CounterMetricFamily,
Info, GaugeMetricFamily,
generate_latest, InfoMetricFamily,
)
# System metrics
SYSTEM_INFO = Info("frigate_system", "System information")
CPU_USAGE = Gauge(
"frigate_cpu_usage_percent",
"Process CPU usage %",
["pid", "name", "process", "type", "cmdline"],
)
MEMORY_USAGE = Gauge(
"frigate_mem_usage_percent",
"Process memory usage %",
["pid", "name", "process", "type", "cmdline"],
)
# Camera metrics
CAMERA_FPS = Gauge(
"frigate_camera_fps",
"Frames per second being consumed from your camera",
["camera_name"],
)
DETECTION_FPS = Gauge(
"frigate_detection_fps",
"Number of times detection is run per second",
["camera_name"],
)
PROCESS_FPS = Gauge(
"frigate_process_fps",
"Frames per second being processed by frigate",
["camera_name"],
)
SKIPPED_FPS = Gauge(
"frigate_skipped_fps", "Frames per second skipped for processing", ["camera_name"]
)
DETECTION_ENABLED = Gauge(
"frigate_detection_enabled", "Detection enabled for camera", ["camera_name"]
)
AUDIO_DBFS = Gauge("frigate_audio_dBFS", "Audio dBFS for camera", ["camera_name"])
AUDIO_RMS = Gauge("frigate_audio_rms", "Audio RMS for camera", ["camera_name"])
# Detector metrics
DETECTOR_INFERENCE = Gauge(
"frigate_detector_inference_speed_seconds",
"Time spent running object detection in seconds",
["name"],
)
DETECTOR_START = Gauge(
"frigate_detection_start", "Detector start time (unix timestamp)", ["name"]
)
# GPU metrics
GPU_USAGE = Gauge("frigate_gpu_usage_percent", "GPU utilisation %", ["gpu_name"])
GPU_MEMORY = Gauge("frigate_gpu_mem_usage_percent", "GPU memory usage %", ["gpu_name"])
# Storage metrics
STORAGE_FREE = Gauge("frigate_storage_free_bytes", "Storage free bytes", ["storage"])
STORAGE_TOTAL = Gauge("frigate_storage_total_bytes", "Storage total bytes", ["storage"])
STORAGE_USED = Gauge("frigate_storage_used_bytes", "Storage used bytes", ["storage"])
STORAGE_MOUNT = Info(
"frigate_storage_mount_type", "Storage mount type", ["mount_type", "storage"]
)
# Service metrics
UPTIME = Gauge("frigate_service_uptime_seconds", "Uptime seconds")
LAST_UPDATE = Gauge(
"frigate_service_last_updated_timestamp", "Stats recorded time (unix timestamp)"
)
TEMPERATURE = Gauge("frigate_device_temperature", "Device Temperature", ["device"])
# Event metrics
CAMERA_EVENTS = Counter(
"frigate_camera_events",
"Count of camera events since exporter started",
["camera", "label"],
) )
def update_metrics(stats: Dict) -> None: class CustomCollector(object):
"""Update Prometheus metrics based on Frigate stats""" def __init__(self, _url):
try: self.process_stats = {}
# Update process metrics self.previous_event_id = None
if "cpu_usages" in stats: self.previous_event_start_time = None
for pid, proc_stats in stats["cpu_usages"].items(): self.all_events = {}
cmdline = proc_stats.get("cmdline", "")
process_type = "Other"
process_name = cmdline
CPU_USAGE.labels( def add_metric(self, metric, label, stats, key, multiplier=1.0): # Now a method
pid=pid, try:
name=process_name, string = str(stats[key])
process=process_name, value = float(re.findall(r"-?\d*\.?\d*", string)[0])
type=process_type, metric.add_metric(label, value * multiplier)
cmdline=cmdline, except (KeyError, TypeError, IndexError, ValueError):
).set(float(proc_stats["cpu"])) pass
MEMORY_USAGE.labels( def add_metric_process(
pid=pid, self,
name=process_name, metric,
process=process_name, camera_stats,
type=process_type, camera_name,
cmdline=cmdline, pid_name,
).set(float(proc_stats["mem"])) process_name,
cpu_or_memory,
process_type,
):
try:
pid = str(camera_stats[pid_name])
label_values = [pid, camera_name, process_name, process_type]
try:
# new frigate:0.13.0-beta3 stat 'cmdline'
label_values.append(self.process_stats[pid]["cmdline"])
except KeyError:
pass
metric.add_metric(label_values, self.process_stats[pid][cpu_or_memory])
del self.process_stats[pid][cpu_or_memory]
except (KeyError, TypeError, IndexError):
pass
# Update camera metrics def collect(self):
if "cameras" in stats: stats = self.process_stats # Assign self.process_stats to local variable stats
for camera_name, camera_stats in stats["cameras"].items():
if "camera_fps" in camera_stats:
CAMERA_FPS.labels(camera_name=camera_name).set(
camera_stats["camera_fps"]
)
if "detection_fps" in camera_stats:
DETECTION_FPS.labels(camera_name=camera_name).set(
camera_stats["detection_fps"]
)
if "process_fps" in camera_stats:
PROCESS_FPS.labels(camera_name=camera_name).set(
camera_stats["process_fps"]
)
if "skipped_fps" in camera_stats:
SKIPPED_FPS.labels(camera_name=camera_name).set(
camera_stats["skipped_fps"]
)
if "detection_enabled" in camera_stats:
DETECTION_ENABLED.labels(camera_name=camera_name).set(
camera_stats["detection_enabled"]
)
if "audio_dBFS" in camera_stats:
AUDIO_DBFS.labels(camera_name=camera_name).set(
camera_stats["audio_dBFS"]
)
if "audio_rms" in camera_stats:
AUDIO_RMS.labels(camera_name=camera_name).set(
camera_stats["audio_rms"]
)
# Update detector metrics try:
if "detectors" in stats: self.process_stats = stats["cpu_usages"]
for name, detector in stats["detectors"].items(): except KeyError:
if "inference_speed" in detector: pass
DETECTOR_INFERENCE.labels(name=name).set(
detector["inference_speed"] * 0.001
) # ms to seconds
if "detection_start" in detector:
DETECTOR_START.labels(name=name).set(detector["detection_start"])
# Update GPU metrics # process stats for cameras, detectors and other
if "gpu_usages" in stats: cpu_usages = GaugeMetricFamily(
for gpu_name, gpu_stats in stats["gpu_usages"].items(): "frigate_cpu_usage_percent",
if "gpu" in gpu_stats: "Process CPU usage %",
GPU_USAGE.labels(gpu_name=gpu_name).set(float(gpu_stats["gpu"])) labels=["pid", "name", "process", "type", "cmdline"],
if "mem" in gpu_stats: )
GPU_MEMORY.labels(gpu_name=gpu_name).set(float(gpu_stats["mem"])) mem_usages = GaugeMetricFamily(
"frigate_mem_usage_percent",
"Process memory usage %",
labels=["pid", "name", "process", "type", "cmdline"],
)
# Update service metrics # camera stats
if "service" in stats: audio_dBFS = GaugeMetricFamily(
service = stats["service"] "frigate_audio_dBFS", "Audio dBFS for camera", labels=["camera_name"]
)
audio_rms = GaugeMetricFamily(
"frigate_audio_rms", "Audio RMS for camera", labels=["camera_name"]
)
camera_fps = GaugeMetricFamily(
"frigate_camera_fps",
"Frames per second being consumed from your camera.",
labels=["camera_name"],
)
detection_enabled = GaugeMetricFamily(
"frigate_detection_enabled",
"Detection enabled for camera",
labels=["camera_name"],
)
detection_fps = GaugeMetricFamily(
"frigate_detection_fps",
"Number of times detection is run per second.",
labels=["camera_name"],
)
process_fps = GaugeMetricFamily(
"frigate_process_fps",
"Frames per second being processed by frigate.",
labels=["camera_name"],
)
skipped_fps = GaugeMetricFamily(
"frigate_skipped_fps",
"Frames per second skip for processing by frigate.",
labels=["camera_name"],
)
if "uptime" in service: # read camera stats assuming version < frigate:0.13.0-beta3
UPTIME.set(service["uptime"]) cameras = stats
if "last_updated" in service: try:
LAST_UPDATE.set(service["last_updated"]) # try to read camera stats in case >= frigate:0.13.0-beta3
cameras = stats["cameras"]
except KeyError:
pass
# Storage metrics for camera_name, camera_stats in cameras.items():
if "storage" in service: self.add_metric(audio_dBFS, [camera_name], camera_stats, "audio_dBFS")
for path, storage in service["storage"].items(): self.add_metric(audio_rms, [camera_name], camera_stats, "audio_rms")
if "free" in storage: self.add_metric(camera_fps, [camera_name], camera_stats, "camera_fps")
STORAGE_FREE.labels(storage=path).set( self.add_metric(
storage["free"] * 1e6 detection_enabled, [camera_name], camera_stats, "detection_enabled"
) # MB to bytes )
if "total" in storage: self.add_metric(detection_fps, [camera_name], camera_stats, "detection_fps")
STORAGE_TOTAL.labels(storage=path).set(storage["total"] * 1e6) self.add_metric(process_fps, [camera_name], camera_stats, "process_fps")
if "used" in storage: self.add_metric(skipped_fps, [camera_name], camera_stats, "skipped_fps")
STORAGE_USED.labels(storage=path).set(storage["used"] * 1e6)
if "mount_type" in storage:
STORAGE_MOUNT.labels(storage=path).info(
{"mount_type": storage["mount_type"], "storage": path}
)
# Temperature metrics self.add_metric_process(
if "temperatures" in service: cpu_usages,
for device, temp in service["temperatures"].items(): camera_stats,
TEMPERATURE.labels(device=device).set(temp) camera_name,
"ffmpeg_pid",
"ffmpeg",
"cpu",
"Camera",
)
self.add_metric_process(
cpu_usages,
camera_stats,
camera_name,
"capture_pid",
"capture",
"cpu",
"Camera",
)
self.add_metric_process(
cpu_usages, camera_stats, camera_name, "pid", "detect", "cpu", "Camera"
)
# Version info self.add_metric_process(
if "version" in service and "latest_version" in service: mem_usages,
SYSTEM_INFO.info( camera_stats,
{ camera_name,
"version": service["version"], "ffmpeg_pid",
"latest_version": service["latest_version"], "ffmpeg",
} "mem",
"Camera",
)
self.add_metric_process(
mem_usages,
camera_stats,
camera_name,
"capture_pid",
"capture",
"mem",
"Camera",
)
self.add_metric_process(
mem_usages, camera_stats, camera_name, "pid", "detect", "mem", "Camera"
)
yield audio_dBFS
yield audio_rms
yield camera_fps
yield detection_enabled
yield detection_fps
yield process_fps
yield skipped_fps
# bandwidth stats
bandwidth_usages = GaugeMetricFamily(
"frigate_bandwidth_usages_kBps",
"bandwidth usages kilobytes per second",
labels=["pid", "name", "process", "cmdline"],
)
try:
for b_pid, b_stats in stats["bandwidth_usages"].items():
label = [b_pid] # pid label
try:
n = stats["cpu_usages"][b_pid]["cmdline"]
for p_name, p_stats in stats["processes"].items():
if str(p_stats["pid"]) == b_pid:
n = p_name
break
# new frigate:0.13.0-beta3 stat 'cmdline'
label.append(n) # name label
label.append(stats["cpu_usages"][b_pid]["cmdline"]) # process label
label.append(stats["cpu_usages"][b_pid]["cmdline"]) # cmdline label
self.add_metric(bandwidth_usages, label, b_stats, "bandwidth")
except KeyError:
pass
except KeyError:
pass
yield bandwidth_usages
# detector stats
try:
yield GaugeMetricFamily(
"frigate_detection_total_fps",
"Sum of detection_fps across all cameras and detectors.",
value=stats["detection_fps"],
)
except KeyError:
pass
detector_inference_speed = GaugeMetricFamily(
"frigate_detector_inference_speed_seconds",
"Time spent running object detection in seconds.",
labels=["name"],
)
detector_detection_start = GaugeMetricFamily(
"frigate_detection_start",
"Detector start time (unix timestamp)",
labels=["name"],
)
try:
for detector_name, detector_stats in stats["detectors"].items():
self.add_metric(
detector_inference_speed,
[detector_name],
detector_stats,
"inference_speed",
0.001,
) # ms to seconds
self.add_metric(
detector_detection_start,
[detector_name],
detector_stats,
"detection_start",
) )
self.add_metric_process(
cpu_usages,
stats["detectors"],
detector_name,
"pid",
"detect",
"cpu",
"Detector",
)
self.add_metric_process(
mem_usages,
stats["detectors"],
detector_name,
"pid",
"detect",
"mem",
"Detector",
)
except KeyError:
pass
yield detector_inference_speed
yield detector_detection_start
# detector process stats
try:
for detector_name, detector_stats in stats["detectors"].items():
p_pid = str(detector_stats["pid"])
label = [p_pid] # pid label
try:
# new frigate:0.13.0-beta3 stat 'cmdline'
label.append(detector_name) # name label
label.append(detector_name) # process label
label.append("detectors") # type label
label.append(self.process_stats[p_pid]["cmdline"]) # cmdline label
self.add_metric(cpu_usages, label, self.process_stats[p_pid], "cpu")
self.add_metric(mem_usages, label, self.process_stats[p_pid], "mem")
del self.process_stats[p_pid]
except KeyError:
pass
except KeyError:
pass
# other named process stats
try:
for process_name, process_stats in stats["processes"].items():
p_pid = str(process_stats["pid"])
label = [p_pid] # pid label
try:
# new frigate:0.13.0-beta3 stat 'cmdline'
label.append(process_name) # name label
label.append(process_name) # process label
label.append(process_name) # type label
label.append(self.process_stats[p_pid]["cmdline"]) # cmdline label
self.add_metric(cpu_usages, label, self.process_stats[p_pid], "cpu")
self.add_metric(mem_usages, label, self.process_stats[p_pid], "mem")
del self.process_stats[p_pid]
except KeyError:
pass
except KeyError:
pass
# remaining process stats
try:
for process_id, pid_stats in self.process_stats.items():
label = [process_id] # pid label
try:
# new frigate:0.13.0-beta3 stat 'cmdline'
label.append(pid_stats["cmdline"]) # name label
label.append(pid_stats["cmdline"]) # process label
label.append("Other") # type label
label.append(pid_stats["cmdline"]) # cmdline label
except KeyError:
pass
self.add_metric(cpu_usages, label, pid_stats, "cpu")
self.add_metric(mem_usages, label, pid_stats, "mem")
except KeyError:
pass
yield cpu_usages
yield mem_usages
# gpu stats
gpu_usages = GaugeMetricFamily(
"frigate_gpu_usage_percent", "GPU utilisation %", labels=["gpu_name"]
)
gpu_mem_usages = GaugeMetricFamily(
"frigate_gpu_mem_usage_percent", "GPU memory usage %", labels=["gpu_name"]
)
try:
for gpu_name, gpu_stats in stats["gpu_usages"].items():
self.add_metric(gpu_usages, [gpu_name], gpu_stats, "gpu")
self.add_metric(gpu_mem_usages, [gpu_name], gpu_stats, "mem")
except KeyError:
pass
yield gpu_usages
yield gpu_mem_usages
# service stats
uptime_seconds = GaugeMetricFamily(
"frigate_service_uptime_seconds", "Uptime seconds"
)
last_updated_timestamp = GaugeMetricFamily(
"frigate_service_last_updated_timestamp",
"Stats recorded time (unix timestamp)",
)
try:
service_stats = stats["service"]
self.add_metric(uptime_seconds, [""], service_stats, "uptime")
self.add_metric(last_updated_timestamp, [""], service_stats, "last_updated")
info = {
"latest_version": stats["service"]["latest_version"],
"version": stats["service"]["version"],
}
yield InfoMetricFamily(
"frigate_service", "Frigate version info", value=info
)
except KeyError:
pass
yield uptime_seconds
yield last_updated_timestamp
temperatures = GaugeMetricFamily(
"frigate_device_temperature", "Device Temperature", labels=["device"]
)
try:
for device_name in stats["service"]["temperatures"]:
self.add_metric(
temperatures,
[device_name],
stats["service"]["temperatures"],
device_name,
)
except KeyError:
pass
yield temperatures
storage_free = GaugeMetricFamily(
"frigate_storage_free_bytes", "Storage free bytes", labels=["storage"]
)
storage_mount_type = InfoMetricFamily(
"frigate_storage_mount_type",
"Storage mount type",
labels=["mount_type", "storage"],
)
storage_total = GaugeMetricFamily(
"frigate_storage_total_bytes", "Storage total bytes", labels=["storage"]
)
storage_used = GaugeMetricFamily(
"frigate_storage_used_bytes", "Storage used bytes", labels=["storage"]
)
try:
for storage_path, storage_stats in stats["service"]["storage"].items():
self.add_metric(
storage_free, [storage_path], storage_stats, "free", 1e6
) # MB to bytes
self.add_metric(
storage_total, [storage_path], storage_stats, "total", 1e6
) # MB to bytes
self.add_metric(
storage_used, [storage_path], storage_stats, "used", 1e6
) # MB to bytes
storage_mount_type.add_metric(
storage_path,
{
"mount_type": storage_stats["mount_type"],
"storage": storage_path,
},
)
except KeyError:
pass
yield storage_free
yield storage_mount_type
yield storage_total
yield storage_used
# count events
events = []
if len(events) > 0:
# events[0] is newest event, last element is oldest, don't need to sort
if not self.previous_event_id:
# ignore all previous events on startup, prometheus might have already counted them
self.previous_event_id = events[0]["id"]
self.previous_event_start_time = int(events[0]["start_time"])
for event in events:
# break if event already counted
if event["id"] == self.previous_event_id:
break
# break if event starts before previous event
if event["start_time"] < self.previous_event_start_time:
break
# store counted events in a dict
try:
cam = self.all_events[event["camera"]]
try:
cam[event["label"]] += 1
except KeyError:
# create label dict if not exists
cam.update({event["label"]: 1})
except KeyError:
# create camera and label dict if not exists
self.all_events.update({event["camera"]: {event["label"]: 1}})
# don't recount events next time
self.previous_event_id = events[0]["id"]
self.previous_event_start_time = int(events[0]["start_time"])
camera_events = CounterMetricFamily(
"frigate_camera_events",
"Count of camera events since exporter started",
labels=["camera", "label"],
)
for camera, cam_dict in self.all_events.items():
for label, label_value in cam_dict.items():
camera_events.add_metric([camera, label], label_value)
yield camera_events
collector = CustomCollector(None)
REGISTRY.register(collector)
def update_metrics(stats):
"""Updates the Prometheus metrics with the given stats data."""
try:
collector.process_stats = stats # Directly assign the stats data
# Important: Since we are not fetching from URL, we need to manually call collect
for _ in collector.collect():
pass
except Exception as e: except Exception as e:
print(f"Error updating Prometheus metrics: {str(e)}") logging.error(f"Error updating metrics: {e}")
def get_metrics() -> tuple[str, str]: def get_metrics():
"""Get Prometheus metrics in text format""" """Returns the Prometheus metrics in text format."""
return generate_latest(), CONTENT_TYPE_LATEST content = generate_latest(REGISTRY) # Use generate_latest
return content, CONTENT_TYPE_LATEST