From 6e3ae0afc2fc95fcb286926c1b1f59b07d189ec3 Mon Sep 17 00:00:00 2001 From: Logan Garrett Date: Sun, 16 Mar 2025 19:48:25 -0400 Subject: [PATCH] Fix Prometheus Metrics race condition (#17187) * fixed metrics race condition * ruff formatting --- frigate/stats/prometheus.py | 84 ++++++++++++++++++++++++------------- 1 file changed, 56 insertions(+), 28 deletions(-) diff --git a/frigate/stats/prometheus.py b/frigate/stats/prometheus.py index 015e551af..bc545f21d 100644 --- a/frigate/stats/prometheus.py +++ b/frigate/stats/prometheus.py @@ -12,7 +12,8 @@ from prometheus_client.core import ( class CustomCollector(object): def __init__(self, _url): - self.process_stats = {} + self.complete_stats = {} # Store complete stats data + self.process_stats = {} # Keep for CPU processing self.previous_event_id = None self.previous_event_start_time = None self.all_events = {} @@ -34,30 +35,34 @@ class CustomCollector(object): process_name, cpu_or_memory, process_type, + cpu_usages, ): try: pid = str(camera_stats[pid_name]) label_values = [pid, camera_name, process_name, process_type] try: # new frigate:0.13.0-beta3 stat 'cmdline' - label_values.append(self.process_stats[pid]["cmdline"]) + label_values.append(cpu_usages[pid]["cmdline"]) except KeyError: pass - metric.add_metric(label_values, self.process_stats[pid][cpu_or_memory]) - del self.process_stats[pid][cpu_or_memory] + metric.add_metric(label_values, cpu_usages[pid][cpu_or_memory]) + # Don't modify the original data except (KeyError, TypeError, IndexError): pass def collect(self): - stats = self.process_stats # Assign self.process_stats to local variable stats + # Work with a copy of the complete stats + stats = self.complete_stats.copy() + # Create a local copy of CPU usages to work with + cpu_usages = {} try: - self.process_stats = stats["cpu_usages"] - except KeyError: + cpu_usages = stats.get("cpu_usages", {}).copy() + except (KeyError, AttributeError): pass # process stats for cameras, detectors and other - cpu_usages = GaugeMetricFamily( + cpu_usages_metric = GaugeMetricFamily( "frigate_cpu_usage_percent", "Process CPU usage %", labels=["pid", "name", "process", "type", "cmdline"], @@ -121,25 +126,34 @@ class CustomCollector(object): self.add_metric(skipped_fps, [camera_name], camera_stats, "skipped_fps") self.add_metric_process( - cpu_usages, + cpu_usages_metric, camera_stats, camera_name, "ffmpeg_pid", "ffmpeg", "cpu", "Camera", + cpu_usages, ) self.add_metric_process( - cpu_usages, + cpu_usages_metric, camera_stats, camera_name, "capture_pid", "capture", "cpu", "Camera", + cpu_usages, ) self.add_metric_process( - cpu_usages, camera_stats, camera_name, "pid", "detect", "cpu", "Camera" + cpu_usages_metric, + camera_stats, + camera_name, + "pid", + "detect", + "cpu", + "Camera", + cpu_usages, ) self.add_metric_process( @@ -150,6 +164,7 @@ class CustomCollector(object): "ffmpeg", "mem", "Camera", + cpu_usages, ) self.add_metric_process( mem_usages, @@ -159,9 +174,17 @@ class CustomCollector(object): "capture", "mem", "Camera", + cpu_usages, ) self.add_metric_process( - mem_usages, camera_stats, camera_name, "pid", "detect", "mem", "Camera" + mem_usages, + camera_stats, + camera_name, + "pid", + "detect", + "mem", + "Camera", + cpu_usages, ) yield audio_dBFS @@ -239,13 +262,14 @@ class CustomCollector(object): "detection_start", ) self.add_metric_process( - cpu_usages, + cpu_usages_metric, stats["detectors"], detector_name, "pid", "detect", "cpu", "Detector", + cpu_usages, ) self.add_metric_process( mem_usages, @@ -255,6 +279,7 @@ class CustomCollector(object): "detect", "mem", "Detector", + cpu_usages, ) except KeyError: pass @@ -272,10 +297,10 @@ class CustomCollector(object): label.append(detector_name) # name label label.append(detector_name) # process label label.append("detectors") # type label - label.append(self.process_stats[p_pid]["cmdline"]) # cmdline label - self.add_metric(cpu_usages, label, self.process_stats[p_pid], "cpu") - self.add_metric(mem_usages, label, self.process_stats[p_pid], "mem") - del self.process_stats[p_pid] + label.append(cpu_usages[p_pid]["cmdline"]) # cmdline label + self.add_metric(cpu_usages_metric, label, cpu_usages[p_pid], "cpu") + self.add_metric(mem_usages, label, cpu_usages[p_pid], "mem") + # Don't modify the original data except KeyError: pass @@ -292,10 +317,10 @@ class CustomCollector(object): label.append(process_name) # name label label.append(process_name) # process label label.append(process_name) # type label - label.append(self.process_stats[p_pid]["cmdline"]) # cmdline label - self.add_metric(cpu_usages, label, self.process_stats[p_pid], "cpu") - self.add_metric(mem_usages, label, self.process_stats[p_pid], "mem") - del self.process_stats[p_pid] + label.append(cpu_usages[p_pid]["cmdline"]) # cmdline label + self.add_metric(cpu_usages_metric, label, cpu_usages[p_pid], "cpu") + self.add_metric(mem_usages, label, cpu_usages[p_pid], "mem") + # Don't modify the original data except KeyError: pass @@ -304,7 +329,7 @@ class CustomCollector(object): # remaining process stats try: - for process_id, pid_stats in self.process_stats.items(): + for process_id, pid_stats in cpu_usages.items(): label = [process_id] # pid label try: # new frigate:0.13.0-beta3 stat 'cmdline' @@ -314,12 +339,12 @@ class CustomCollector(object): label.append(pid_stats["cmdline"]) # cmdline label except KeyError: pass - self.add_metric(cpu_usages, label, pid_stats, "cpu") + self.add_metric(cpu_usages_metric, label, pid_stats, "cpu") self.add_metric(mem_usages, label, pid_stats, "mem") except KeyError: pass - yield cpu_usages + yield cpu_usages_metric yield mem_usages # gpu stats @@ -481,10 +506,13 @@ REGISTRY.register(collector) def update_metrics(stats): """Updates the Prometheus metrics with the given stats data.""" try: - collector.process_stats = stats # Directly assign the stats data - # Important: Since we are not fetching from URL, we need to manually call collect - for _ in collector.collect(): - pass + # Store the complete stats for later use by collect() + collector.complete_stats = stats.copy() + + # For backwards compatibility + collector.process_stats = stats.copy() + + # No need to call collect() here - it will be called by get_metrics() except Exception as e: logging.error(f"Error updating metrics: {e}")