Fix Prometheus Metrics race condition (#17187)

* fixed metrics race condition

* ruff formatting
This commit is contained in:
Logan Garrett 2025-03-16 19:48:25 -04:00 committed by GitHub
parent c724892158
commit 6e3ae0afc2
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194

View File

@ -12,7 +12,8 @@ from prometheus_client.core import (
class CustomCollector(object): class CustomCollector(object):
def __init__(self, _url): def __init__(self, _url):
self.process_stats = {} self.complete_stats = {} # Store complete stats data
self.process_stats = {} # Keep for CPU processing
self.previous_event_id = None self.previous_event_id = None
self.previous_event_start_time = None self.previous_event_start_time = None
self.all_events = {} self.all_events = {}
@ -34,30 +35,34 @@ class CustomCollector(object):
process_name, process_name,
cpu_or_memory, cpu_or_memory,
process_type, process_type,
cpu_usages,
): ):
try: try:
pid = str(camera_stats[pid_name]) pid = str(camera_stats[pid_name])
label_values = [pid, camera_name, process_name, process_type] label_values = [pid, camera_name, process_name, process_type]
try: try:
# new frigate:0.13.0-beta3 stat 'cmdline' # new frigate:0.13.0-beta3 stat 'cmdline'
label_values.append(self.process_stats[pid]["cmdline"]) label_values.append(cpu_usages[pid]["cmdline"])
except KeyError: except KeyError:
pass pass
metric.add_metric(label_values, self.process_stats[pid][cpu_or_memory]) metric.add_metric(label_values, cpu_usages[pid][cpu_or_memory])
del self.process_stats[pid][cpu_or_memory] # Don't modify the original data
except (KeyError, TypeError, IndexError): except (KeyError, TypeError, IndexError):
pass pass
def collect(self): def collect(self):
stats = self.process_stats # Assign self.process_stats to local variable stats # Work with a copy of the complete stats
stats = self.complete_stats.copy()
# Create a local copy of CPU usages to work with
cpu_usages = {}
try: try:
self.process_stats = stats["cpu_usages"] cpu_usages = stats.get("cpu_usages", {}).copy()
except KeyError: except (KeyError, AttributeError):
pass pass
# process stats for cameras, detectors and other # process stats for cameras, detectors and other
cpu_usages = GaugeMetricFamily( cpu_usages_metric = GaugeMetricFamily(
"frigate_cpu_usage_percent", "frigate_cpu_usage_percent",
"Process CPU usage %", "Process CPU usage %",
labels=["pid", "name", "process", "type", "cmdline"], labels=["pid", "name", "process", "type", "cmdline"],
@ -121,25 +126,34 @@ class CustomCollector(object):
self.add_metric(skipped_fps, [camera_name], camera_stats, "skipped_fps") self.add_metric(skipped_fps, [camera_name], camera_stats, "skipped_fps")
self.add_metric_process( self.add_metric_process(
cpu_usages, cpu_usages_metric,
camera_stats, camera_stats,
camera_name, camera_name,
"ffmpeg_pid", "ffmpeg_pid",
"ffmpeg", "ffmpeg",
"cpu", "cpu",
"Camera", "Camera",
cpu_usages,
) )
self.add_metric_process( self.add_metric_process(
cpu_usages, cpu_usages_metric,
camera_stats, camera_stats,
camera_name, camera_name,
"capture_pid", "capture_pid",
"capture", "capture",
"cpu", "cpu",
"Camera", "Camera",
cpu_usages,
) )
self.add_metric_process( self.add_metric_process(
cpu_usages, camera_stats, camera_name, "pid", "detect", "cpu", "Camera" cpu_usages_metric,
camera_stats,
camera_name,
"pid",
"detect",
"cpu",
"Camera",
cpu_usages,
) )
self.add_metric_process( self.add_metric_process(
@ -150,6 +164,7 @@ class CustomCollector(object):
"ffmpeg", "ffmpeg",
"mem", "mem",
"Camera", "Camera",
cpu_usages,
) )
self.add_metric_process( self.add_metric_process(
mem_usages, mem_usages,
@ -159,9 +174,17 @@ class CustomCollector(object):
"capture", "capture",
"mem", "mem",
"Camera", "Camera",
cpu_usages,
) )
self.add_metric_process( self.add_metric_process(
mem_usages, camera_stats, camera_name, "pid", "detect", "mem", "Camera" mem_usages,
camera_stats,
camera_name,
"pid",
"detect",
"mem",
"Camera",
cpu_usages,
) )
yield audio_dBFS yield audio_dBFS
@ -239,13 +262,14 @@ class CustomCollector(object):
"detection_start", "detection_start",
) )
self.add_metric_process( self.add_metric_process(
cpu_usages, cpu_usages_metric,
stats["detectors"], stats["detectors"],
detector_name, detector_name,
"pid", "pid",
"detect", "detect",
"cpu", "cpu",
"Detector", "Detector",
cpu_usages,
) )
self.add_metric_process( self.add_metric_process(
mem_usages, mem_usages,
@ -255,6 +279,7 @@ class CustomCollector(object):
"detect", "detect",
"mem", "mem",
"Detector", "Detector",
cpu_usages,
) )
except KeyError: except KeyError:
pass pass
@ -272,10 +297,10 @@ class CustomCollector(object):
label.append(detector_name) # name label label.append(detector_name) # name label
label.append(detector_name) # process label label.append(detector_name) # process label
label.append("detectors") # type label label.append("detectors") # type label
label.append(self.process_stats[p_pid]["cmdline"]) # cmdline label label.append(cpu_usages[p_pid]["cmdline"]) # cmdline label
self.add_metric(cpu_usages, label, self.process_stats[p_pid], "cpu") self.add_metric(cpu_usages_metric, label, cpu_usages[p_pid], "cpu")
self.add_metric(mem_usages, label, self.process_stats[p_pid], "mem") self.add_metric(mem_usages, label, cpu_usages[p_pid], "mem")
del self.process_stats[p_pid] # Don't modify the original data
except KeyError: except KeyError:
pass pass
@ -292,10 +317,10 @@ class CustomCollector(object):
label.append(process_name) # name label label.append(process_name) # name label
label.append(process_name) # process label label.append(process_name) # process label
label.append(process_name) # type label label.append(process_name) # type label
label.append(self.process_stats[p_pid]["cmdline"]) # cmdline label label.append(cpu_usages[p_pid]["cmdline"]) # cmdline label
self.add_metric(cpu_usages, label, self.process_stats[p_pid], "cpu") self.add_metric(cpu_usages_metric, label, cpu_usages[p_pid], "cpu")
self.add_metric(mem_usages, label, self.process_stats[p_pid], "mem") self.add_metric(mem_usages, label, cpu_usages[p_pid], "mem")
del self.process_stats[p_pid] # Don't modify the original data
except KeyError: except KeyError:
pass pass
@ -304,7 +329,7 @@ class CustomCollector(object):
# remaining process stats # remaining process stats
try: try:
for process_id, pid_stats in self.process_stats.items(): for process_id, pid_stats in cpu_usages.items():
label = [process_id] # pid label label = [process_id] # pid label
try: try:
# new frigate:0.13.0-beta3 stat 'cmdline' # new frigate:0.13.0-beta3 stat 'cmdline'
@ -314,12 +339,12 @@ class CustomCollector(object):
label.append(pid_stats["cmdline"]) # cmdline label label.append(pid_stats["cmdline"]) # cmdline label
except KeyError: except KeyError:
pass pass
self.add_metric(cpu_usages, label, pid_stats, "cpu") self.add_metric(cpu_usages_metric, label, pid_stats, "cpu")
self.add_metric(mem_usages, label, pid_stats, "mem") self.add_metric(mem_usages, label, pid_stats, "mem")
except KeyError: except KeyError:
pass pass
yield cpu_usages yield cpu_usages_metric
yield mem_usages yield mem_usages
# gpu stats # gpu stats
@ -481,10 +506,13 @@ REGISTRY.register(collector)
def update_metrics(stats): def update_metrics(stats):
"""Updates the Prometheus metrics with the given stats data.""" """Updates the Prometheus metrics with the given stats data."""
try: try:
collector.process_stats = stats # Directly assign the stats data # Store the complete stats for later use by collect()
# Important: Since we are not fetching from URL, we need to manually call collect collector.complete_stats = stats.copy()
for _ in collector.collect():
pass # For backwards compatibility
collector.process_stats = stats.copy()
# No need to call collect() here - it will be called by get_metrics()
except Exception as e: except Exception as e:
logging.error(f"Error updating metrics: {e}") logging.error(f"Error updating metrics: {e}")