From 848502344280734f26f17c1820077bcc5220c52e Mon Sep 17 00:00:00 2001 From: Nicolas Mowen Date: Thu, 12 Jun 2025 12:12:34 -0600 Subject: [PATCH] Use Fork-Server As Spawn Method (#18682) * Set runtime * Use count correctly * Don't assume camera sizes * Use separate zmq proxy for object detection * Correct order * Use forkserver * Only store PID instead of entire process reference * Cleanup * Catch correct errors * Fix typing * Remove before_run from process util The before_run never actually ran because: You're right to suspect an issue with before_run not being called and a potential deadlock. The way you've implemented the run_wrapper using __getattribute__ for the run method of BaseProcess is a common pitfall in Python's multiprocessing, especially when combined with how multiprocessing.Process works internally. Here's a breakdown of why before_run isn't being called and why you might be experiencing a deadlock: The Problem: __getattribute__ and Process Serialization When you create a multiprocessing.Process object and call start(), the multiprocessing module needs to serialize the process object (or at least enough of it to re-create the process in the new interpreter). It then pickles this serialized object and sends it to the newly spawned process. The issue with your __getattribute__ implementation for run is that: run is retrieved during serialization: When multiprocessing tries to pickle your Process object to send to the new process, it will likely access the run attribute. This triggers your __getattribute__ wrapper, which then tries to bind run_wrapper to self. run_wrapper is bound to the parent process's self: The run_wrapper closure, when created in the parent process, captures the self (the Process instance) from the parent's memory space. Deserialization creates a new object: In the child process, a new Process object is created by deserializing the pickled data. However, the run_wrapper method that was pickled still holds a reference to the self from the parent process. This is a subtle but critical distinction. The child's run is not your wrapped run: When the child process starts, it internally calls its own run method. Because of the serialization and deserialization process, the run method that's ultimately executed in the child process is the original multiprocessing.Process.run or the Process.run if you had directly overridden it. Your __getattribute__ magic, which wraps run, isn't correctly applied to the Process object within the child's context. * Cleanup * Logging bugfix (#18465) * use mp Manager to handle logging queues A Python bug (https://github.com/python/cpython/issues/91555) was preventing logs from the embeddings maintainer process from printing. The bug is fixed in Python 3.14, but a viable workaround is to use the multiprocessing Manager, which better manages mp queues and causes the logging to work correctly. * consolidate * fix typing * Fix typing * Use global log queue * Move to using process for logging * Convert camera tracking to process * Add more processes * Finalize process * Cleanup * Cleanup typing * Formatting * Remove daemon --------- Co-authored-by: Josh Hawkins <32435876+hawkeye217@users.noreply.github.com> --- frigate/__main__.py | 7 +- frigate/app.py | 61 ++-- frigate/camera/__init__.py | 32 +-- frigate/camera/maintainer.py | 65 ++--- frigate/comms/object_detector_signaler.py | 85 +++++- frigate/data_processing/types.py | 34 +-- frigate/embeddings/__init__.py | 58 ++-- frigate/embeddings/maintainer.py | 23 +- frigate/events/audio.py | 9 +- frigate/log.py | 13 +- frigate/object_detection/base.py | 139 +++++---- frigate/output/output.py | 330 +++++++++++----------- frigate/record/record.py | 60 ++-- frigate/review/review.py | 37 +-- frigate/stats/util.py | 10 +- frigate/util/builtin.py | 13 +- frigate/util/process.py | 29 +- frigate/video.py | 208 +++++++------- web/src/views/system/CameraMetrics.tsx | 2 +- 19 files changed, 611 insertions(+), 604 deletions(-) diff --git a/frigate/__main__.py b/frigate/__main__.py index 4c732be80..6dd5d130e 100644 --- a/frigate/__main__.py +++ b/frigate/__main__.py @@ -1,5 +1,6 @@ import argparse import faulthandler +import multiprocessing as mp import signal import sys import threading @@ -15,10 +16,11 @@ from frigate.util.config import find_config_file def main() -> None: + manager = mp.Manager() faulthandler.enable() # Setup the logging thread - setup_logging() + setup_logging(manager) threading.current_thread().name = "frigate" @@ -108,8 +110,9 @@ def main() -> None: sys.exit(0) # Run the main application. - FrigateApp(config).start() + FrigateApp(config, manager).start() if __name__ == "__main__": + mp.set_start_method("forkserver", force=True) main() diff --git a/frigate/app.py b/frigate/app.py index cccbce53e..f78bd561c 100644 --- a/frigate/app.py +++ b/frigate/app.py @@ -5,6 +5,7 @@ import os import secrets import shutil from multiprocessing import Queue +from multiprocessing.managers import DictProxy, SyncManager from multiprocessing.synchronize import Event as MpEvent from typing import Optional @@ -13,7 +14,6 @@ import uvicorn from peewee_migrate import Router from playhouse.sqlite_ext import SqliteExtDatabase -import frigate.util as util from frigate.api.auth import hash_password from frigate.api.fastapi_app import create_fastapi_app from frigate.camera import CameraMetrics, PTZMetrics @@ -23,6 +23,7 @@ from frigate.comms.dispatcher import Dispatcher from frigate.comms.event_metadata_updater import EventMetadataPublisher from frigate.comms.inter_process import InterProcessCommunicator from frigate.comms.mqtt import MqttClient +from frigate.comms.object_detector_signaler import DetectorProxy from frigate.comms.webpush import WebPushClient from frigate.comms.ws import WebSocketClient from frigate.comms.zmq_proxy import ZmqProxy @@ -40,10 +41,11 @@ from frigate.const import ( ) from frigate.data_processing.types import DataProcessorMetrics from frigate.db.sqlitevecq import SqliteVecQueueDatabase -from frigate.embeddings import EmbeddingsContext, manage_embeddings +from frigate.embeddings import EmbeddingProcess, EmbeddingsContext from frigate.events.audio import AudioProcessor from frigate.events.cleanup import EventCleanup from frigate.events.maintainer import EventProcessor +from frigate.log import _stop_logging from frigate.models import ( Event, Export, @@ -56,13 +58,13 @@ from frigate.models import ( User, ) from frigate.object_detection.base import ObjectDetectProcess -from frigate.output.output import output_frames +from frigate.output.output import OutputProcess from frigate.ptz.autotrack import PtzAutoTrackerThread from frigate.ptz.onvif import OnvifController from frigate.record.cleanup import RecordingCleanup from frigate.record.export import migrate_exports -from frigate.record.record import manage_recordings -from frigate.review.review import manage_review_segments +from frigate.record.record import RecordProcess +from frigate.review.review import ReviewProcess from frigate.stats.emitter import StatsEmitter from frigate.stats.util import stats_init from frigate.storage import StorageMaintainer @@ -77,16 +79,19 @@ logger = logging.getLogger(__name__) class FrigateApp: - def __init__(self, config: FrigateConfig) -> None: + def __init__(self, config: FrigateConfig, manager: SyncManager) -> None: + self.metrics_manager = manager self.audio_process: Optional[mp.Process] = None self.stop_event: MpEvent = mp.Event() self.detection_queue: Queue = mp.Queue() self.detectors: dict[str, ObjectDetectProcess] = {} self.detection_shms: list[mp.shared_memory.SharedMemory] = [] self.log_queue: Queue = mp.Queue() - self.camera_metrics: dict[str, CameraMetrics] = {} + self.camera_metrics: DictProxy = self.metrics_manager.dict() self.embeddings_metrics: DataProcessorMetrics | None = ( - DataProcessorMetrics(list(config.classification.custom.keys())) + DataProcessorMetrics( + self.metrics_manager, list(config.classification.custom.keys()) + ) if ( config.semantic_search.enabled or config.lpr.enabled @@ -124,7 +129,7 @@ class FrigateApp: def init_camera_metrics(self) -> None: # create camera_metrics for camera_name in self.config.cameras.keys(): - self.camera_metrics[camera_name] = CameraMetrics() + self.camera_metrics[camera_name] = CameraMetrics(self.metrics_manager) self.ptz_metrics[camera_name] = PTZMetrics( autotracker_enabled=self.config.cameras[ camera_name @@ -218,24 +223,14 @@ class FrigateApp: self.processes["go2rtc"] = proc.info["pid"] def init_recording_manager(self) -> None: - recording_process = util.Process( - target=manage_recordings, - name="recording_manager", - args=(self.config,), - ) - recording_process.daemon = True + recording_process = RecordProcess(self.config) self.recording_process = recording_process recording_process.start() self.processes["recording"] = recording_process.pid or 0 logger.info(f"Recording process started: {recording_process.pid}") def init_review_segment_manager(self) -> None: - review_segment_process = util.Process( - target=manage_review_segments, - name="review_segment_manager", - args=(self.config,), - ) - review_segment_process.daemon = True + review_segment_process = ReviewProcess(self.config) self.review_segment_process = review_segment_process review_segment_process.start() self.processes["review_segment"] = review_segment_process.pid or 0 @@ -254,15 +249,10 @@ class FrigateApp: ): return - embedding_process = util.Process( - target=manage_embeddings, - name="embeddings_manager", - args=( - self.config, - self.embeddings_metrics, - ), + embedding_process = EmbeddingProcess( + self.config, + self.embeddings_metrics, ) - embedding_process.daemon = True self.embedding_process = embedding_process embedding_process.start() self.processes["embeddings"] = embedding_process.pid or 0 @@ -330,6 +320,7 @@ class FrigateApp: self.inter_config_updater = CameraConfigUpdatePublisher() self.event_metadata_updater = EventMetadataPublisher() self.inter_zmq_proxy = ZmqProxy() + self.detection_proxy = DetectorProxy() def init_onvif(self) -> None: self.onvif_controller = OnvifController(self.config, self.ptz_metrics) @@ -418,12 +409,7 @@ class FrigateApp: self.detected_frames_processor.start() def start_video_output_processor(self) -> None: - output_processor = util.Process( - target=output_frames, - name="output_processor", - args=(self.config,), - ) - output_processor.daemon = True + output_processor = OutputProcess(self.config) self.output_processor = output_processor output_processor.start() logger.info(f"Output process started: {output_processor.pid}") @@ -554,11 +540,11 @@ class FrigateApp: self.init_recording_manager() self.init_review_segment_manager() self.init_go2rtc() - self.start_detectors() self.init_embeddings_manager() self.bind_database() self.check_db_data_migrations() self.init_inter_process_communicator() + self.start_detectors() self.init_dispatcher() self.init_embeddings_client() self.start_video_output_processor() @@ -661,10 +647,13 @@ class FrigateApp: self.inter_config_updater.stop() self.event_metadata_updater.stop() self.inter_zmq_proxy.stop() + self.detection_proxy.stop() while len(self.detection_shms) > 0: shm = self.detection_shms.pop() shm.close() shm.unlink() + _stop_logging() + self.metrics_manager.shutdown() os._exit(os.EX_OK) diff --git a/frigate/camera/__init__.py b/frigate/camera/__init__.py index 456751c52..77b1fd424 100644 --- a/frigate/camera/__init__.py +++ b/frigate/camera/__init__.py @@ -1,7 +1,7 @@ import multiprocessing as mp +from multiprocessing.managers import SyncManager from multiprocessing.sharedctypes import Synchronized from multiprocessing.synchronize import Event -from typing import Optional class CameraMetrics: @@ -16,25 +16,25 @@ class CameraMetrics: frame_queue: mp.Queue - process: Optional[mp.Process] - capture_process: Optional[mp.Process] + process_pid: Synchronized + capture_process_pid: Synchronized ffmpeg_pid: Synchronized - def __init__(self): - self.camera_fps = mp.Value("d", 0) - self.detection_fps = mp.Value("d", 0) - self.detection_frame = mp.Value("d", 0) - self.process_fps = mp.Value("d", 0) - self.skipped_fps = mp.Value("d", 0) - self.read_start = mp.Value("d", 0) - self.audio_rms = mp.Value("d", 0) - self.audio_dBFS = mp.Value("d", 0) + def __init__(self, manager: SyncManager): + self.camera_fps = manager.Value("d", 0) + self.detection_fps = manager.Value("d", 0) + self.detection_frame = manager.Value("d", 0) + self.process_fps = manager.Value("d", 0) + self.skipped_fps = manager.Value("d", 0) + self.read_start = manager.Value("d", 0) + self.audio_rms = manager.Value("d", 0) + self.audio_dBFS = manager.Value("d", 0) - self.frame_queue = mp.Queue(maxsize=2) + self.frame_queue = manager.Queue(maxsize=2) - self.process = None - self.capture_process = None - self.ffmpeg_pid = mp.Value("i", 0) + self.process_pid = manager.Value("i", 0) + self.capture_process_pid = manager.Value("i", 0) + self.ffmpeg_pid = manager.Value("i", 0) class PTZMetrics: diff --git a/frigate/camera/maintainer.py b/frigate/camera/maintainer.py index 6abeb762e..dd978bbfc 100644 --- a/frigate/camera/maintainer.py +++ b/frigate/camera/maintainer.py @@ -1,10 +1,12 @@ """Create and maintain camera processes / management.""" import logging +import multiprocessing as mp import os import shutil import threading from multiprocessing import Queue +from multiprocessing.managers import DictProxy from multiprocessing.synchronize import Event as MpEvent from frigate.camera import CameraMetrics, PTZMetrics @@ -16,11 +18,10 @@ from frigate.config.camera.updater import ( ) from frigate.const import SHM_FRAMES_VAR from frigate.models import Regions -from frigate.util import Process as FrigateProcess from frigate.util.builtin import empty_and_close_queue from frigate.util.image import SharedMemoryFrameManager, UntrackedSharedMemory from frigate.util.object import get_camera_regions_grid -from frigate.video import capture_camera, track_camera +from frigate.video import CameraCapture, CameraTracker logger = logging.getLogger(__name__) @@ -31,7 +32,7 @@ class CameraMaintainer(threading.Thread): config: FrigateConfig, detection_queue: Queue, detected_frames_queue: Queue, - camera_metrics: dict[str, CameraMetrics], + camera_metrics: DictProxy, ptz_metrics: dict[str, PTZMetrics], stop_event: MpEvent, ): @@ -53,6 +54,8 @@ class CameraMaintainer(threading.Thread): ], ) self.shm_count = self.__calculate_shm_frame_count() + self.camera_processes: dict[str, mp.Process] = {} + self.capture_processes: dict[str, mp.Process] = {} def __init_historical_regions(self) -> None: # delete region grids for removed or renamed cameras @@ -94,7 +97,7 @@ class CameraMaintainer(threading.Thread): # leave room for 2 cameras that are added dynamically, if a user wants to add more cameras they may need to increase the SHM size and restart after adding them. cam_total_frame_size += 2 * round( - (camera.detect.width * camera.detect.height * 1.5 + 270480) / 1048576, + (1280 * 720 * 1.5 + 270480) / 1048576, 1, ) @@ -151,24 +154,19 @@ class CameraMaintainer(threading.Thread): except FileExistsError: pass - camera_process = FrigateProcess( - target=track_camera, - name=f"camera_processor:{name}", - args=( - config.name, - config, - self.config.model, - self.config.model.merged_labelmap, - self.detection_queue, - self.detected_frames_queue, - self.camera_metrics[name], - self.ptz_metrics[name], - self.region_grids[name], - ), - daemon=True, + camera_process = CameraTracker( + config, + self.config.model, + self.config.model.merged_labelmap, + self.detection_queue, + self.detected_frames_queue, + self.camera_metrics[name], + self.ptz_metrics[name], + self.region_grids[name], ) - self.camera_metrics[config.name].process = camera_process + self.camera_processes[config.name] = camera_process camera_process.start() + self.camera_metrics[config.name].process_pid.value = camera_process.pid logger.info(f"Camera processor started for {config.name}: {camera_process.pid}") def __start_camera_capture( @@ -179,36 +177,33 @@ class CameraMaintainer(threading.Thread): return # pre-create shms - for i in range(10 if runtime else self.shm_count): + count = 10 if runtime else self.shm_count + for i in range(count): frame_size = config.frame_shape_yuv[0] * config.frame_shape_yuv[1] self.frame_manager.create(f"{config.name}_frame{i}", frame_size) - capture_process = FrigateProcess( - target=capture_camera, - name=f"camera_capture:{name}", - args=(config, self.shm_count, self.camera_metrics[name]), - ) + capture_process = CameraCapture(config, count, self.camera_metrics[name]) capture_process.daemon = True - self.camera_metrics[name].capture_process = capture_process + self.capture_processes[name] = capture_process capture_process.start() + self.camera_metrics[name].capture_process_pid.value = capture_process.pid logger.info(f"Capture process started for {name}: {capture_process.pid}") def __stop_camera_capture_process(self, camera: str) -> None: - capture_process = self.camera_metrics[camera].capture_process + capture_process = self.capture_processes[camera] if capture_process is not None: logger.info(f"Waiting for capture process for {camera} to stop") capture_process.terminate() capture_process.join() def __stop_camera_process(self, camera: str) -> None: - metrics = self.camera_metrics[camera] - camera_process = metrics.process + camera_process = self.camera_processes[camera] if camera_process is not None: logger.info(f"Waiting for process for {camera} to stop") camera_process.terminate() camera_process.join() logger.info(f"Closing frame queue for {camera}") - empty_and_close_queue(metrics.frame_queue) + empty_and_close_queue(self.camera_metrics[camera].frame_queue) def run(self): self.__init_historical_regions() @@ -230,18 +225,20 @@ class CameraMaintainer(threading.Thread): runtime=True, ) self.__start_camera_capture( - camera, self.update_subscriber.camera_configs[camera] + camera, + self.update_subscriber.camera_configs[camera], + runtime=True, ) elif update_type == CameraConfigUpdateEnum.remove.name: self.__stop_camera_capture_process(camera) self.__stop_camera_process(camera) # ensure the capture processes are done - for camera in self.camera_metrics.keys(): + for camera in self.camera_processes.keys(): self.__stop_camera_capture_process(camera) # ensure the camera processors are done - for camera in self.camera_metrics.keys(): + for camera in self.capture_processes.keys(): self.__stop_camera_process(camera) self.update_subscriber.stop() diff --git a/frigate/comms/object_detector_signaler.py b/frigate/comms/object_detector_signaler.py index befc83e4d..e8871db1a 100644 --- a/frigate/comms/object_detector_signaler.py +++ b/frigate/comms/object_detector_signaler.py @@ -1,21 +1,92 @@ """Facilitates communication between processes for object detection signals.""" -from .zmq_proxy import Publisher, Subscriber +import threading + +import zmq + +SOCKET_PUB = "ipc:///tmp/cache/detector_pub" +SOCKET_SUB = "ipc:///tmp/cache/detector_sub" -class ObjectDetectorPublisher(Publisher): +class ZmqProxyRunner(threading.Thread): + def __init__(self, context: zmq.Context[zmq.Socket]) -> None: + super().__init__(name="detector_proxy") + self.context = context + + def run(self) -> None: + """Run the proxy.""" + incoming = self.context.socket(zmq.XSUB) + incoming.bind(SOCKET_PUB) + outgoing = self.context.socket(zmq.XPUB) + outgoing.bind(SOCKET_SUB) + + # Blocking: This will unblock (via exception) when we destroy the context + # The incoming and outgoing sockets will be closed automatically + # when the context is destroyed as well. + try: + zmq.proxy(incoming, outgoing) + except zmq.ZMQError: + pass + + +class DetectorProxy: + """Proxies object detection signals.""" + + def __init__(self) -> None: + self.context = zmq.Context() + self.runner = ZmqProxyRunner(self.context) + self.runner.start() + + def stop(self) -> None: + # destroying the context will tell the proxy to stop + self.context.destroy() + self.runner.join() + + +class ObjectDetectorPublisher: """Publishes signal for object detection to different processes.""" topic_base = "object_detector/" + def __init__(self, topic: str = "") -> None: + self.topic = f"{self.topic_base}{topic}" + self.context = zmq.Context() + self.socket = self.context.socket(zmq.PUB) + self.socket.connect(SOCKET_PUB) -class ObjectDetectorSubscriber(Subscriber): + def publish(self, sub_topic: str = "") -> None: + """Publish message.""" + self.socket.send_string(f"{self.topic}{sub_topic}/") + + def stop(self) -> None: + self.socket.close() + self.context.destroy() + + +class ObjectDetectorSubscriber: """Simplifies receiving a signal for object detection.""" topic_base = "object_detector/" - def __init__(self, topic: str) -> None: - super().__init__(topic) + def __init__(self, topic: str = "") -> None: + self.topic = f"{self.topic_base}{topic}/" + self.context = zmq.Context() + self.socket = self.context.socket(zmq.SUB) + self.socket.setsockopt_string(zmq.SUBSCRIBE, self.topic) + self.socket.connect(SOCKET_SUB) - def check_for_update(self): - return super().check_for_update(timeout=5) + def check_for_update(self, timeout: float = 5) -> str | None: + """Returns message or None if no update.""" + try: + has_update, _, _ = zmq.select([self.socket], [], [], timeout) + + if has_update: + return self.socket.recv_string(flags=zmq.NOBLOCK) + except zmq.ZMQError: + pass + + return None + + def stop(self) -> None: + self.socket.close() + self.context.destroy() diff --git a/frigate/data_processing/types.py b/frigate/data_processing/types.py index 50f1ed561..d18a1175a 100644 --- a/frigate/data_processing/types.py +++ b/frigate/data_processing/types.py @@ -1,7 +1,7 @@ """Embeddings types.""" -import multiprocessing as mp from enum import Enum +from multiprocessing.managers import SyncManager from multiprocessing.sharedctypes import Synchronized import sherpa_onnx @@ -20,25 +20,27 @@ class DataProcessorMetrics: alpr_pps: Synchronized yolov9_lpr_speed: Synchronized yolov9_lpr_pps: Synchronized - classification_speeds: dict[str, Synchronized] = {} - classification_cps: dict[str, Synchronized] = {} + classification_speeds: dict[str, Synchronized] + classification_cps: dict[str, Synchronized] - def __init__(self, custom_classification_models: list[str]): - self.image_embeddings_speed = mp.Value("d", 0.0) - self.image_embeddings_eps = mp.Value("d", 0.0) - self.text_embeddings_speed = mp.Value("d", 0.0) - self.text_embeddings_eps = mp.Value("d", 0.0) - self.face_rec_speed = mp.Value("d", 0.0) - self.face_rec_fps = mp.Value("d", 0.0) - self.alpr_speed = mp.Value("d", 0.0) - self.alpr_pps = mp.Value("d", 0.0) - self.yolov9_lpr_speed = mp.Value("d", 0.0) - self.yolov9_lpr_pps = mp.Value("d", 0.0) + def __init__(self, manager: SyncManager, custom_classification_models: list[str]): + self.image_embeddings_speed = manager.Value("d", 0.0) + self.image_embeddings_eps = manager.Value("d", 0.0) + self.text_embeddings_speed = manager.Value("d", 0.0) + self.text_embeddings_eps = manager.Value("d", 0.0) + self.face_rec_speed = manager.Value("d", 0.0) + self.face_rec_fps = manager.Value("d", 0.0) + self.alpr_speed = manager.Value("d", 0.0) + self.alpr_pps = manager.Value("d", 0.0) + self.yolov9_lpr_speed = manager.Value("d", 0.0) + self.yolov9_lpr_pps = manager.Value("d", 0.0) + self.classification_speeds = manager.dict() + self.classification_cps = manager.dict() if custom_classification_models: for key in custom_classification_models: - self.classification_speeds[key] = mp.Value("d", 0.0) - self.classification_cps[key] = mp.Value("d", 0.0) + self.classification_speeds[key] = manager.Value("d", 0.0) + self.classification_cps[key] = manager.Value("d", 0.0) class DataProcessorModelRunner: diff --git a/frigate/embeddings/__init__.py b/frigate/embeddings/__init__.py index 80832369c..9870c9460 100644 --- a/frigate/embeddings/__init__.py +++ b/frigate/embeddings/__init__.py @@ -3,26 +3,22 @@ import base64 import json import logging -import multiprocessing as mp import os -import signal import threading -from types import FrameType -from typing import Any, Optional, Union +from typing import Any, Union import regex from pathvalidate import ValidationError, sanitize_filename -from setproctitle import setproctitle from frigate.comms.embeddings_updater import EmbeddingsRequestEnum, EmbeddingsRequestor from frigate.config import FrigateConfig from frigate.const import CONFIG_DIR, FACE_DIR from frigate.data_processing.types import DataProcessorMetrics from frigate.db.sqlitevecq import SqliteVecQueueDatabase -from frigate.models import Event, Recordings +from frigate.models import Event +from frigate.util import Process as FrigateProcess from frigate.util.builtin import serialize from frigate.util.classification import kickoff_model_training -from frigate.util.services import listen from .maintainer import EmbeddingMaintainer from .util import ZScoreNormalization @@ -30,40 +26,22 @@ from .util import ZScoreNormalization logger = logging.getLogger(__name__) -def manage_embeddings(config: FrigateConfig, metrics: DataProcessorMetrics) -> None: - stop_event = mp.Event() +class EmbeddingProcess(FrigateProcess): + def __init__( + self, config: FrigateConfig, metrics: DataProcessorMetrics | None + ) -> None: + super().__init__(name="frigate.embeddings_manager", daemon=True) + self.config = config + self.metrics = metrics - def receiveSignal(signalNumber: int, frame: Optional[FrameType]) -> None: - stop_event.set() - - signal.signal(signal.SIGTERM, receiveSignal) - signal.signal(signal.SIGINT, receiveSignal) - - threading.current_thread().name = "process:embeddings_manager" - setproctitle("frigate.embeddings_manager") - listen() - - # Configure Frigate DB - db = SqliteVecQueueDatabase( - config.database.path, - pragmas={ - "auto_vacuum": "FULL", # Does not defragment database - "cache_size": -512 * 1000, # 512MB of cache - "synchronous": "NORMAL", # Safe when using WAL https://www.sqlite.org/pragma.html#pragma_synchronous - }, - timeout=max(60, 10 * len([c for c in config.cameras.values() if c.enabled])), - load_vec_extension=True, - ) - models = [Event, Recordings] - db.bind(models) - - maintainer = EmbeddingMaintainer( - db, - config, - metrics, - stop_event, - ) - maintainer.start() + def run(self) -> None: + self.pre_run_setup() + maintainer = EmbeddingMaintainer( + self.config, + self.metrics, + self.stop_event, + ) + maintainer.start() class EmbeddingsContext: diff --git a/frigate/embeddings/maintainer.py b/frigate/embeddings/maintainer.py index 0980a8ae8..c659d04fe 100644 --- a/frigate/embeddings/maintainer.py +++ b/frigate/embeddings/maintainer.py @@ -12,7 +12,6 @@ from typing import Any, Optional import cv2 import numpy as np from peewee import DoesNotExist -from playhouse.sqliteq import SqliteQueueDatabase from frigate.comms.detections_updater import DetectionSubscriber, DetectionTypeEnum from frigate.comms.embeddings_updater import EmbeddingsRequestEnum, EmbeddingsResponder @@ -58,9 +57,10 @@ from frigate.data_processing.real_time.license_plate import ( LicensePlateRealTimeProcessor, ) from frigate.data_processing.types import DataProcessorMetrics, PostProcessDataEnum +from frigate.db.sqlitevecq import SqliteVecQueueDatabase from frigate.events.types import EventTypeEnum, RegenerateDescriptionEnum from frigate.genai import get_genai_client -from frigate.models import Event +from frigate.models import Event, Recordings from frigate.types import TrackedObjectUpdateTypesEnum from frigate.util.builtin import serialize from frigate.util.image import ( @@ -82,9 +82,8 @@ class EmbeddingMaintainer(threading.Thread): def __init__( self, - db: SqliteQueueDatabase, config: FrigateConfig, - metrics: DataProcessorMetrics, + metrics: DataProcessorMetrics | None, stop_event: MpEvent, ) -> None: super().__init__(name="embeddings_maintainer") @@ -97,6 +96,22 @@ class EmbeddingMaintainer(threading.Thread): [CameraConfigUpdateEnum.add, CameraConfigUpdateEnum.remove], ) + # Configure Frigate DB + db = SqliteVecQueueDatabase( + config.database.path, + pragmas={ + "auto_vacuum": "FULL", # Does not defragment database + "cache_size": -512 * 1000, # 512MB of cache + "synchronous": "NORMAL", # Safe when using WAL https://www.sqlite.org/pragma.html#pragma_synchronous + }, + timeout=max( + 60, 10 * len([c for c in config.cameras.values() if c.enabled]) + ), + load_vec_extension=True, + ) + models = [Event, Recordings] + db.bind(models) + if config.semantic_search.enabled: self.embeddings = Embeddings(config, db, metrics) diff --git a/frigate/events/audio.py b/frigate/events/audio.py index 797a767ba..9152428fa 100644 --- a/frigate/events/audio.py +++ b/frigate/events/audio.py @@ -6,12 +6,12 @@ import random import string import threading import time +from multiprocessing.managers import DictProxy from typing import Any, Tuple import numpy as np import frigate.util as util -from frigate.camera import CameraMetrics from frigate.comms.detections_updater import DetectionPublisher, DetectionTypeEnum from frigate.comms.event_metadata_updater import ( EventMetadataPublisher, @@ -83,7 +83,7 @@ class AudioProcessor(util.Process): self, config: FrigateConfig, cameras: list[CameraConfig], - camera_metrics: dict[str, CameraMetrics], + camera_metrics: DictProxy, ): super().__init__(name="frigate.audio_manager", daemon=True) @@ -93,7 +93,7 @@ class AudioProcessor(util.Process): if any( [ - conf.audio_transcription.enabled_in_config + conf.audio_transcription.enabled_in_config == True for conf in config.cameras.values() ] ): @@ -105,6 +105,7 @@ class AudioProcessor(util.Process): self.transcription_model_runner = None def run(self) -> None: + self.pre_run_setup() audio_threads: list[AudioEventMaintainer] = [] threading.current_thread().name = "process:audio_manager" @@ -146,7 +147,7 @@ class AudioEventMaintainer(threading.Thread): self, camera: CameraConfig, config: FrigateConfig, - camera_metrics: dict[str, CameraMetrics], + camera_metrics: DictProxy, audio_transcription_model_runner: AudioTranscriptionModelRunner | None, stop_event: threading.Event, ) -> None: diff --git a/frigate/log.py b/frigate/log.py index 53e9004f5..f535a278c 100644 --- a/frigate/log.py +++ b/frigate/log.py @@ -1,11 +1,13 @@ +# In log.py import atexit import logging -import multiprocessing as mp import os import sys import threading from collections import deque from logging.handlers import QueueHandler, QueueListener +from multiprocessing.managers import SyncManager +from queue import Queue from typing import Deque, Optional from frigate.util.builtin import clean_camera_user_pass @@ -32,12 +34,12 @@ LOG_HANDLER.addFilter( ) log_listener: Optional[QueueListener] = None +log_queue: Optional[Queue] = None -def setup_logging() -> None: - global log_listener - - log_queue: mp.Queue = mp.Queue() +def setup_logging(manager: SyncManager) -> None: + global log_listener, log_queue + log_queue = manager.Queue() log_listener = QueueListener(log_queue, LOG_HANDLER, respect_handler_level=True) atexit.register(_stop_logging) @@ -54,7 +56,6 @@ def setup_logging() -> None: def _stop_logging() -> None: global log_listener - if log_listener is not None: log_listener.stop() log_listener = None diff --git a/frigate/object_detection/base.py b/frigate/object_detection/base.py index 86febc6a7..d203e8574 100644 --- a/frigate/object_detection/base.py +++ b/frigate/object_detection/base.py @@ -1,16 +1,11 @@ import datetime import logging -import multiprocessing as mp -import os import queue -import signal -import threading from abc import ABC, abstractmethod from multiprocessing import Queue, Value from multiprocessing.synchronize import Event as MpEvent import numpy as np -from setproctitle import setproctitle import frigate.util as util from frigate.comms.object_detector_signaler import ( @@ -25,7 +20,6 @@ from frigate.detectors.detector_config import ( ) from frigate.util.builtin import EventsPerSecond, load_labels from frigate.util.image import SharedMemoryFrameManager, UntrackedSharedMemory -from frigate.util.services import listen from .util import tensor_transform @@ -90,73 +84,75 @@ class LocalObjectDetector(ObjectDetector): return self.detect_api.detect_raw(tensor_input=tensor_input) -def run_detector( - name: str, - detection_queue: Queue, - cameras: list[str], - avg_speed: Value, - start: Value, - detector_config: BaseDetectorConfig, -): - threading.current_thread().name = f"detector:{name}" - logger = logging.getLogger(f"detector.{name}") - logger.info(f"Starting detection process: {os.getpid()}") - setproctitle(f"frigate.detector.{name}") - listen() +class DetectorRunner(util.Process): + def __init__( + self, + name, + detection_queue: Queue, + cameras: list[str], + avg_speed: Value, + start_time: Value, + detector_config: BaseDetectorConfig, + ) -> None: + super().__init__(name=name, daemon=True) + self.detection_queue = detection_queue + self.cameras = cameras + self.avg_speed = avg_speed + self.start_time = start_time + self.detector_config = detector_config + self.outputs: dict = {} - stop_event: MpEvent = mp.Event() - - def receiveSignal(signalNumber, frame): - stop_event.set() - - signal.signal(signal.SIGTERM, receiveSignal) - signal.signal(signal.SIGINT, receiveSignal) - - def create_output_shm(name: str): + def create_output_shm(self, name: str): out_shm = UntrackedSharedMemory(name=f"out-{name}", create=False) out_np = np.ndarray((20, 6), dtype=np.float32, buffer=out_shm.buf) - outputs[name] = {"shm": out_shm, "np": out_np} + self.outputs[name] = {"shm": out_shm, "np": out_np} - frame_manager = SharedMemoryFrameManager() - object_detector = LocalObjectDetector(detector_config=detector_config) - detector_publisher = ObjectDetectorPublisher() + def run(self) -> None: + self.pre_run_setup() - outputs = {} - for name in cameras: - create_output_shm(name) + frame_manager = SharedMemoryFrameManager() + object_detector = LocalObjectDetector(detector_config=self.detector_config) + detector_publisher = ObjectDetectorPublisher() - while not stop_event.is_set(): - try: - connection_id = detection_queue.get(timeout=1) - except queue.Empty: - continue - input_frame = frame_manager.get( - connection_id, - (1, detector_config.model.height, detector_config.model.width, 3), - ) + for name in self.cameras: + self.create_output_shm(name) - if input_frame is None: - logger.warning(f"Failed to get frame {connection_id} from SHM") - continue + while not self.stop_event.is_set(): + try: + connection_id = self.detection_queue.get(timeout=1) + except queue.Empty: + continue + input_frame = frame_manager.get( + connection_id, + ( + 1, + self.detector_config.model.height, + self.detector_config.model.width, + 3, + ), + ) - # detect and send the output - start.value = datetime.datetime.now().timestamp() - detections = object_detector.detect_raw(input_frame) - duration = datetime.datetime.now().timestamp() - start.value - frame_manager.close(connection_id) + if input_frame is None: + logger.warning(f"Failed to get frame {connection_id} from SHM") + continue - if connection_id not in outputs: - create_output_shm(connection_id) + # detect and send the output + self.start_time.value = datetime.datetime.now().timestamp() + detections = object_detector.detect_raw(input_frame) + duration = datetime.datetime.now().timestamp() - self.start_time.value + frame_manager.close(connection_id) - outputs[connection_id]["np"][:] = detections[:] - signal_id = f"{connection_id}/update" - detector_publisher.publish(signal_id, signal_id) - start.value = 0.0 + if connection_id not in self.outputs: + self.create_output_shm(connection_id) - avg_speed.value = (avg_speed.value * 9 + duration) / 10 + self.outputs[connection_id]["np"][:] = detections[:] + detector_publisher.publish(connection_id) + self.start_time.value = 0.0 - detector_publisher.stop() - logger.info("Exited detection process...") + self.avg_speed.value = (self.avg_speed.value * 9 + duration) / 10 + + detector_publisher.stop() + logger.info("Exited detection process...") class ObjectDetectProcess: @@ -193,19 +189,14 @@ class ObjectDetectProcess: self.detection_start.value = 0.0 if (self.detect_process is not None) and self.detect_process.is_alive(): self.stop() - self.detect_process = util.Process( - target=run_detector, - name=f"detector:{self.name}", - args=( - self.name, - self.detection_queue, - self.cameras, - self.avg_inference_speed, - self.detection_start, - self.detector_config, - ), + self.detect_process = DetectorRunner( + f"detector:{self.name}", + self.detection_queue, + self.cameras, + self.avg_inference_speed, + self.detection_start, + self.detector_config, ) - self.detect_process.daemon = True self.detect_process.start() @@ -231,7 +222,7 @@ class RemoteObjectDetector: ) self.out_shm = UntrackedSharedMemory(name=f"out-{self.name}", create=False) self.out_np_shm = np.ndarray((20, 6), dtype=np.float32, buffer=self.out_shm.buf) - self.detector_subscriber = ObjectDetectorSubscriber(f"{name}/update") + self.detector_subscriber = ObjectDetectorSubscriber(name) def detect(self, tensor_input, threshold=0.4): detections = [] diff --git a/frigate/output/output.py b/frigate/output/output.py index d323596fe..8c60e51c7 100644 --- a/frigate/output/output.py +++ b/frigate/output/output.py @@ -2,14 +2,11 @@ import datetime import logging -import multiprocessing as mp import os import shutil -import signal import threading from wsgiref.simple_server import make_server -from setproctitle import setproctitle from ws4py.server.wsgirefserver import ( WebSocketWSGIHandler, WebSocketWSGIRequestHandler, @@ -17,6 +14,7 @@ from ws4py.server.wsgirefserver import ( ) from ws4py.server.wsgiutils import WebSocketWSGIApplication +import frigate.util as util from frigate.comms.detections_updater import DetectionSubscriber, DetectionTypeEnum from frigate.comms.ws import WebSocket from frigate.config import FrigateConfig @@ -73,189 +71,193 @@ def check_disabled_camera_update( birdseye.all_cameras_disabled() -def output_frames( - config: FrigateConfig, -): - threading.current_thread().name = "output" - setproctitle("frigate.output") +class OutputProcess(util.Process): + def __init__(self, config: FrigateConfig) -> None: + super().__init__(name="frigate.output", daemon=True) + self.config = config - stop_event = mp.Event() + def run(self) -> None: + self.pre_run_setup() - def receiveSignal(signalNumber, frame): - stop_event.set() + frame_manager = SharedMemoryFrameManager() - signal.signal(signal.SIGTERM, receiveSignal) - signal.signal(signal.SIGINT, receiveSignal) - - frame_manager = SharedMemoryFrameManager() - - # start a websocket server on 8082 - WebSocketWSGIHandler.http_version = "1.1" - websocket_server = make_server( - "127.0.0.1", - 8082, - server_class=WSGIServer, - handler_class=WebSocketWSGIRequestHandler, - app=WebSocketWSGIApplication(handler_cls=WebSocket), - ) - websocket_server.initialize_websockets_manager() - websocket_thread = threading.Thread(target=websocket_server.serve_forever) - - detection_subscriber = DetectionSubscriber(DetectionTypeEnum.video) - config_subscriber = CameraConfigUpdateSubscriber( - config, - config.cameras, - [ - CameraConfigUpdateEnum.add, - CameraConfigUpdateEnum.birdseye, - CameraConfigUpdateEnum.enabled, - CameraConfigUpdateEnum.record, - ], - ) - - jsmpeg_cameras: dict[str, JsmpegCamera] = {} - birdseye: Birdseye | None = None - preview_recorders: dict[str, PreviewRecorder] = {} - preview_write_times: dict[str, float] = {} - failed_frame_requests: dict[str, int] = {} - last_disabled_cam_check = datetime.datetime.now().timestamp() - - move_preview_frames("cache") - - for camera, cam_config in config.cameras.items(): - if not cam_config.enabled_in_config: - continue - - jsmpeg_cameras[camera] = JsmpegCamera(cam_config, stop_event, websocket_server) - preview_recorders[camera] = PreviewRecorder(cam_config) - preview_write_times[camera] = 0 - - if config.birdseye.enabled: - birdseye = Birdseye(config, stop_event, websocket_server) - - websocket_thread.start() - - while not stop_event.is_set(): - # check if there is an updated config - updates = config_subscriber.check_for_updates() - - if "add" in updates: - for camera in updates["add"]: - jsmpeg_cameras[camera] = JsmpegCamera( - cam_config, stop_event, websocket_server - ) - preview_recorders[camera] = PreviewRecorder(cam_config) - preview_write_times[camera] = 0 - - (topic, data) = detection_subscriber.check_for_update(timeout=1) - now = datetime.datetime.now().timestamp() - - if now - last_disabled_cam_check > 5: - # check disabled cameras every 5 seconds - last_disabled_cam_check = now - check_disabled_camera_update( - config, birdseye, preview_recorders, preview_write_times - ) - - if not topic: - continue - - ( - camera, - frame_name, - frame_time, - current_tracked_objects, - motion_boxes, - _, - ) = data - - if not config.cameras[camera].enabled: - continue - - frame = frame_manager.get(frame_name, config.cameras[camera].frame_shape_yuv) - - if frame is None: - logger.debug(f"Failed to get frame {frame_name} from SHM") - failed_frame_requests[camera] = failed_frame_requests.get(camera, 0) + 1 - - if failed_frame_requests[camera] > config.cameras[camera].detect.fps: - logger.warning( - f"Failed to retrieve many frames for {camera} from SHM, consider increasing SHM size if this continues." - ) - - continue - else: - failed_frame_requests[camera] = 0 - - # send frames for low fps recording - preview_recorders[camera].write_data( - current_tracked_objects, motion_boxes, frame_time, frame + # start a websocket server on 8082 + WebSocketWSGIHandler.http_version = "1.1" + websocket_server = make_server( + "127.0.0.1", + 8082, + server_class=WSGIServer, + handler_class=WebSocketWSGIRequestHandler, + app=WebSocketWSGIApplication(handler_cls=WebSocket), ) - preview_write_times[camera] = frame_time + websocket_server.initialize_websockets_manager() + websocket_thread = threading.Thread(target=websocket_server.serve_forever) - # send camera frame to ffmpeg process if websockets are connected - if any( - ws.environ["PATH_INFO"].endswith(camera) for ws in websocket_server.manager - ): - # write to the converter for the camera if clients are listening to the specific camera - jsmpeg_cameras[camera].write_frame(frame.tobytes()) + detection_subscriber = DetectionSubscriber(DetectionTypeEnum.video) + config_subscriber = CameraConfigUpdateSubscriber( + self.config, + self.config.cameras, + [ + CameraConfigUpdateEnum.add, + CameraConfigUpdateEnum.birdseye, + CameraConfigUpdateEnum.enabled, + CameraConfigUpdateEnum.record, + ], + ) - # send output data to birdseye if websocket is connected or restreaming - if config.birdseye.enabled and ( - config.birdseye.restream - or any( - ws.environ["PATH_INFO"].endswith("birdseye") - for ws in websocket_server.manager + jsmpeg_cameras: dict[str, JsmpegCamera] = {} + birdseye: Birdseye | None = None + preview_recorders: dict[str, PreviewRecorder] = {} + preview_write_times: dict[str, float] = {} + failed_frame_requests: dict[str, int] = {} + last_disabled_cam_check = datetime.datetime.now().timestamp() + + move_preview_frames("cache") + + for camera, cam_config in self.config.cameras.items(): + if not cam_config.enabled_in_config: + continue + + jsmpeg_cameras[camera] = JsmpegCamera( + cam_config, self.stop_event, websocket_server ) - ): - birdseye.write_data( + preview_recorders[camera] = PreviewRecorder(cam_config) + preview_write_times[camera] = 0 + + if self.config.birdseye.enabled: + birdseye = Birdseye(self.config, self.stop_event, websocket_server) + + websocket_thread.start() + + while not self.stop_event.is_set(): + # check if there is an updated config + updates = config_subscriber.check_for_updates() + + if "add" in updates: + for camera in updates["add"]: + jsmpeg_cameras[camera] = JsmpegCamera( + cam_config, self.stop_event, websocket_server + ) + preview_recorders[camera] = PreviewRecorder(cam_config) + preview_write_times[camera] = 0 + + (topic, data) = detection_subscriber.check_for_update(timeout=1) + now = datetime.datetime.now().timestamp() + + if now - last_disabled_cam_check > 5: + # check disabled cameras every 5 seconds + last_disabled_cam_check = now + check_disabled_camera_update( + self.config, birdseye, preview_recorders, preview_write_times + ) + + if not topic: + continue + + ( camera, + frame_name, + frame_time, current_tracked_objects, motion_boxes, - frame_time, - frame, + _, + ) = data + + if not self.config.cameras[camera].enabled: + continue + + frame = frame_manager.get( + frame_name, self.config.cameras[camera].frame_shape_yuv ) - frame_manager.close(frame_name) + if frame is None: + logger.debug(f"Failed to get frame {frame_name} from SHM") + failed_frame_requests[camera] = failed_frame_requests.get(camera, 0) + 1 - move_preview_frames("clips") + if ( + failed_frame_requests[camera] + > self.config.cameras[camera].detect.fps + ): + logger.warning( + f"Failed to retrieve many frames for {camera} from SHM, consider increasing SHM size if this continues." + ) - while True: - (topic, data) = detection_subscriber.check_for_update(timeout=0) + continue + else: + failed_frame_requests[camera] = 0 - if not topic: - break + # send frames for low fps recording + preview_recorders[camera].write_data( + current_tracked_objects, motion_boxes, frame_time, frame + ) + preview_write_times[camera] = frame_time - ( - camera, - frame_name, - frame_time, - current_tracked_objects, - motion_boxes, - regions, - ) = data + # send camera frame to ffmpeg process if websockets are connected + if any( + ws.environ["PATH_INFO"].endswith(camera) + for ws in websocket_server.manager + ): + # write to the converter for the camera if clients are listening to the specific camera + jsmpeg_cameras[camera].write_frame(frame.tobytes()) - frame = frame_manager.get(frame_name, config.cameras[camera].frame_shape_yuv) - frame_manager.close(frame_name) + # send output data to birdseye if websocket is connected or restreaming + if self.config.birdseye.enabled and ( + self.config.birdseye.restream + or any( + ws.environ["PATH_INFO"].endswith("birdseye") + for ws in websocket_server.manager + ) + ): + birdseye.write_data( + camera, + current_tracked_objects, + motion_boxes, + frame_time, + frame, + ) - detection_subscriber.stop() + frame_manager.close(frame_name) - for jsmpeg in jsmpeg_cameras.values(): - jsmpeg.stop() + move_preview_frames("clips") - for preview in preview_recorders.values(): - preview.stop() + while True: + (topic, data) = detection_subscriber.check_for_update(timeout=0) - if birdseye is not None: - birdseye.stop() + if not topic: + break - config_subscriber.stop() - websocket_server.manager.close_all() - websocket_server.manager.stop() - websocket_server.manager.join() - websocket_server.shutdown() - websocket_thread.join() - logger.info("exiting output process...") + ( + camera, + frame_name, + frame_time, + current_tracked_objects, + motion_boxes, + regions, + ) = data + + frame = frame_manager.get( + frame_name, self.config.cameras[camera].frame_shape_yuv + ) + frame_manager.close(frame_name) + + detection_subscriber.stop() + + for jsmpeg in jsmpeg_cameras.values(): + jsmpeg.stop() + + for preview in preview_recorders.values(): + preview.stop() + + if birdseye is not None: + birdseye.stop() + + config_subscriber.stop() + websocket_server.manager.close_all() + websocket_server.manager.stop() + websocket_server.manager.join() + websocket_server.shutdown() + websocket_thread.join() + logger.info("exiting output process...") def move_preview_frames(loc: str): diff --git a/frigate/record/record.py b/frigate/record/record.py index 252b80545..40a943a43 100644 --- a/frigate/record/record.py +++ b/frigate/record/record.py @@ -1,50 +1,40 @@ """Run recording maintainer and cleanup.""" import logging -import multiprocessing as mp -import signal -import threading -from types import FrameType -from typing import Optional from playhouse.sqliteq import SqliteQueueDatabase -from setproctitle import setproctitle from frigate.config import FrigateConfig from frigate.models import Recordings, ReviewSegment from frigate.record.maintainer import RecordingMaintainer -from frigate.util.services import listen +from frigate.util import Process as FrigateProcess logger = logging.getLogger(__name__) -def manage_recordings(config: FrigateConfig) -> None: - stop_event = mp.Event() +class RecordProcess(FrigateProcess): + def __init__(self, config: FrigateConfig) -> None: + super().__init__(name="frigate.recording_manager", daemon=True) + self.config = config - def receiveSignal(signalNumber: int, frame: Optional[FrameType]) -> None: - stop_event.set() + def run(self) -> None: + self.pre_run_setup() + db = SqliteQueueDatabase( + self.config.database.path, + pragmas={ + "auto_vacuum": "FULL", # Does not defragment database + "cache_size": -512 * 1000, # 512MB of cache + "synchronous": "NORMAL", # Safe when using WAL https://www.sqlite.org/pragma.html#pragma_synchronous + }, + timeout=max( + 60, 10 * len([c for c in self.config.cameras.values() if c.enabled]) + ), + ) + models = [ReviewSegment, Recordings] + db.bind(models) - signal.signal(signal.SIGTERM, receiveSignal) - signal.signal(signal.SIGINT, receiveSignal) - - threading.current_thread().name = "process:recording_manager" - setproctitle("frigate.recording_manager") - listen() - - db = SqliteQueueDatabase( - config.database.path, - pragmas={ - "auto_vacuum": "FULL", # Does not defragment database - "cache_size": -512 * 1000, # 512MB of cache - "synchronous": "NORMAL", # Safe when using WAL https://www.sqlite.org/pragma.html#pragma_synchronous - }, - timeout=max(60, 10 * len([c for c in config.cameras.values() if c.enabled])), - ) - models = [ReviewSegment, Recordings] - db.bind(models) - - maintainer = RecordingMaintainer( - config, - stop_event, - ) - maintainer.start() + maintainer = RecordingMaintainer( + self.config, + self.stop_event, + ) + maintainer.start() diff --git a/frigate/review/review.py b/frigate/review/review.py index dafa6c802..00910e439 100644 --- a/frigate/review/review.py +++ b/frigate/review/review.py @@ -1,36 +1,23 @@ """Run recording maintainer and cleanup.""" import logging -import multiprocessing as mp -import signal -import threading -from types import FrameType -from typing import Optional - -from setproctitle import setproctitle +import frigate.util as util from frigate.config import FrigateConfig from frigate.review.maintainer import ReviewSegmentMaintainer -from frigate.util.services import listen logger = logging.getLogger(__name__) -def manage_review_segments(config: FrigateConfig) -> None: - stop_event = mp.Event() +class ReviewProcess(util.Process): + def __init__(self, config: FrigateConfig) -> None: + super().__init__(name="frigate.review_segment_manager", daemon=True) + self.config = config - def receiveSignal(signalNumber: int, frame: Optional[FrameType]) -> None: - stop_event.set() - - signal.signal(signal.SIGTERM, receiveSignal) - signal.signal(signal.SIGINT, receiveSignal) - - threading.current_thread().name = "process:review_segment_manager" - setproctitle("frigate.review_segment_manager") - listen() - - maintainer = ReviewSegmentMaintainer( - config, - stop_event, - ) - maintainer.start() + def run(self) -> None: + self.pre_run_setup() + maintainer = ReviewSegmentMaintainer( + self.config, + self.stop_event, + ) + maintainer.start() diff --git a/frigate/stats/util.py b/frigate/stats/util.py index 5078269eb..dbba0c191 100644 --- a/frigate/stats/util.py +++ b/frigate/stats/util.py @@ -5,13 +5,13 @@ import os import shutil import time from json import JSONDecodeError +from multiprocessing.managers import DictProxy from typing import Any, Optional import psutil import requests from requests.exceptions import RequestException -from frigate.camera import CameraMetrics from frigate.config import FrigateConfig from frigate.const import CACHE_DIR, CLIPS_DIR, RECORD_DIR from frigate.data_processing.types import DataProcessorMetrics @@ -53,7 +53,7 @@ def get_latest_version(config: FrigateConfig) -> str: def stats_init( config: FrigateConfig, - camera_metrics: dict[str, CameraMetrics], + camera_metrics: DictProxy, embeddings_metrics: DataProcessorMetrics | None, detectors: dict[str, ObjectDetectProcess], processes: dict[str, int], @@ -271,10 +271,12 @@ def stats_snapshot( stats["cameras"] = {} for name, camera_stats in camera_metrics.items(): total_detection_fps += camera_stats.detection_fps.value - pid = camera_stats.process.pid if camera_stats.process else None + pid = camera_stats.process_pid.value if camera_stats.process_pid.value else None ffmpeg_pid = camera_stats.ffmpeg_pid.value if camera_stats.ffmpeg_pid else None capture_pid = ( - camera_stats.capture_process.pid if camera_stats.capture_process else None + camera_stats.capture_process_pid.value + if camera_stats.capture_process_pid.value + else None ) stats["cameras"][name] = { "camera_fps": round(camera_stats.camera_fps.value, 2), diff --git a/frigate/util/builtin.py b/frigate/util/builtin.py index 0433af18e..90c0f9227 100644 --- a/frigate/util/builtin.py +++ b/frigate/util/builtin.py @@ -341,11 +341,14 @@ def clear_and_unlink(file: Path, missing_ok: bool = True) -> None: def empty_and_close_queue(q: mp.Queue): while True: try: - q.get(block=True, timeout=0.5) - except queue.Empty: - q.close() - q.join_thread() - return + try: + q.get(block=True, timeout=0.5) + except (queue.Empty, EOFError): + q.close() + q.join_thread() + return + except AttributeError: + pass def generate_color_palette(n): diff --git a/frigate/util/process.py b/frigate/util/process.py index ac15539fe..3501e585e 100644 --- a/frigate/util/process.py +++ b/frigate/util/process.py @@ -4,9 +4,8 @@ import multiprocessing as mp import signal import sys import threading -from functools import wraps from logging.handlers import QueueHandler -from typing import Any, Callable, Optional +from typing import Callable, Optional import frigate.log @@ -30,34 +29,12 @@ class BaseProcess(mp.Process): super().start(*args, **kwargs) self.after_start() - def __getattribute__(self, name: str) -> Any: - if name == "run": - run = super().__getattribute__("run") - - @wraps(run) - def run_wrapper(*args, **kwargs): - try: - self.before_run() - return run(*args, **kwargs) - finally: - self.after_run() - - return run_wrapper - - return super().__getattribute__(name) - def before_start(self) -> None: pass def after_start(self) -> None: pass - def before_run(self) -> None: - pass - - def after_run(self) -> None: - pass - class Process(BaseProcess): logger: logging.Logger @@ -73,7 +50,7 @@ class Process(BaseProcess): def before_start(self) -> None: self.__log_queue = frigate.log.log_listener.queue - def before_run(self) -> None: + def pre_run_setup(self) -> None: faulthandler.enable() def receiveSignal(signalNumber, frame): @@ -88,8 +65,6 @@ class Process(BaseProcess): signal.signal(signal.SIGTERM, receiveSignal) signal.signal(signal.SIGINT, receiveSignal) - self.logger = logging.getLogger(self.name) - logging.basicConfig(handlers=[], force=True) logging.getLogger().addHandler(QueueHandler(self.__log_queue)) diff --git a/frigate/video.py b/frigate/video.py index 03377d01a..80deae707 100755 --- a/frigate/video.py +++ b/frigate/video.py @@ -1,9 +1,7 @@ import datetime import logging -import multiprocessing as mp import os import queue -import signal import subprocess as sp import threading import time @@ -12,8 +10,8 @@ from multiprocessing.synchronize import Event as MpEvent from typing import Any import cv2 -from setproctitle import setproctitle +import frigate.util as util from frigate.camera import CameraMetrics, PTZMetrics from frigate.comms.inter_process import InterProcessRequestor from frigate.config import CameraConfig, DetectConfig, ModelConfig @@ -53,7 +51,6 @@ from frigate.util.object import ( is_object_filtered, reduce_detections, ) -from frigate.util.services import listen logger = logging.getLogger(__name__) @@ -318,7 +315,7 @@ class CameraWatchdog(threading.Thread): ffmpeg_cmd, self.logger, self.logpipe, self.frame_size ) self.ffmpeg_pid.value = self.ffmpeg_detect_process.pid - self.capture_thread = CameraCapture( + self.capture_thread = CameraCaptureRunner( self.config, self.shm_frame_count, self.frame_index, @@ -396,7 +393,7 @@ class CameraWatchdog(threading.Thread): return newest_segment_time -class CameraCapture(threading.Thread): +class CameraCaptureRunner(threading.Thread): def __init__( self, config: CameraConfig, @@ -440,103 +437,103 @@ class CameraCapture(threading.Thread): ) -def capture_camera( - config: CameraConfig, shm_frame_count: int, camera_metrics: CameraMetrics -): - stop_event = mp.Event() +class CameraCapture(util.Process): + def __init__( + self, config: CameraConfig, shm_frame_count: int, camera_metrics: CameraMetrics + ) -> None: + super().__init__(name=f"camera_capture:{config.name}", daemon=True) + self.config = config + self.shm_frame_count = shm_frame_count + self.camera_metrics = camera_metrics - def receiveSignal(signalNumber, frame): - stop_event.set() - - signal.signal(signal.SIGTERM, receiveSignal) - signal.signal(signal.SIGINT, receiveSignal) - - threading.current_thread().name = f"capture:{config.name}" - setproctitle(f"frigate.capture:{config.name}") - - camera_watchdog = CameraWatchdog( - config, - shm_frame_count, - camera_metrics.frame_queue, - camera_metrics.camera_fps, - camera_metrics.skipped_fps, - camera_metrics.ffmpeg_pid, - stop_event, - ) - camera_watchdog.start() - camera_watchdog.join() + def run(self) -> None: + self.pre_run_setup() + camera_watchdog = CameraWatchdog( + self.config, + self.shm_frame_count, + self.camera_metrics.frame_queue, + self.camera_metrics.camera_fps, + self.camera_metrics.skipped_fps, + self.camera_metrics.ffmpeg_pid, + self.stop_event, + ) + camera_watchdog.start() + camera_watchdog.join() -def track_camera( - name, - config: CameraConfig, - model_config: ModelConfig, - labelmap: dict[int, str], - detection_queue: Queue, - detected_objects_queue, - camera_metrics: CameraMetrics, - ptz_metrics: PTZMetrics, - region_grid: list[list[dict[str, Any]]], -): - stop_event = mp.Event() - - def receiveSignal(signalNumber, frame): - stop_event.set() - - signal.signal(signal.SIGTERM, receiveSignal) - signal.signal(signal.SIGINT, receiveSignal) - - threading.current_thread().name = f"process:{name}" - setproctitle(f"frigate.process:{name}") - listen() - - frame_queue = camera_metrics.frame_queue - - frame_shape = config.frame_shape - - motion_detector = ImprovedMotionDetector( - frame_shape, - config.motion, - config.detect.fps, - name=config.name, - ptz_metrics=ptz_metrics, - ) - object_detector = RemoteObjectDetector( - name, labelmap, detection_queue, model_config, stop_event - ) - - object_tracker = NorfairTracker(config, ptz_metrics) - - frame_manager = SharedMemoryFrameManager() - - # create communication for region grid updates - requestor = InterProcessRequestor() - - process_frames( - name, - requestor, - frame_queue, - frame_shape, - model_config, - config, - frame_manager, - motion_detector, - object_detector, - object_tracker, +class CameraTracker(util.Process): + def __init__( + self, + config: CameraConfig, + model_config: ModelConfig, + labelmap: dict[int, str], + detection_queue: Queue, detected_objects_queue, - camera_metrics, - stop_event, - ptz_metrics, - region_grid, - ) + camera_metrics: CameraMetrics, + ptz_metrics: PTZMetrics, + region_grid: list[list[dict[str, Any]]], + ) -> None: + super().__init__(name=f"camera_processor:{config.name}", daemon=True) + self.config = config + self.model_config = model_config + self.labelmap = labelmap + self.detection_queue = detection_queue + self.detected_objects_queue = detected_objects_queue + self.camera_metrics = camera_metrics + self.ptz_metrics = ptz_metrics + self.region_grid = region_grid - # empty the frame queue - logger.info(f"{name}: emptying frame queue") - while not frame_queue.empty(): - (frame_name, _) = frame_queue.get(False) - frame_manager.delete(frame_name) + def run(self) -> None: + self.pre_run_setup() + frame_queue = self.camera_metrics.frame_queue + frame_shape = self.config.frame_shape - logger.info(f"{name}: exiting subprocess") + motion_detector = ImprovedMotionDetector( + frame_shape, + self.config.motion, + self.config.detect.fps, + name=self.config.name, + ptz_metrics=self.ptz_metrics, + ) + object_detector = RemoteObjectDetector( + self.config.name, + self.labelmap, + self.detection_queue, + self.model_config, + self.stop_event, + ) + + object_tracker = NorfairTracker(self.config, self.ptz_metrics) + + frame_manager = SharedMemoryFrameManager() + + # create communication for region grid updates + requestor = InterProcessRequestor() + + process_frames( + requestor, + frame_queue, + frame_shape, + self.model_config, + self.config, + frame_manager, + motion_detector, + object_detector, + object_tracker, + self.detected_objects_queue, + self.camera_metrics, + self.stop_event, + self.ptz_metrics, + self.region_grid, + ) + + # empty the frame queue + logger.info(f"{self.config.name}: emptying frame queue") + while not frame_queue.empty(): + (frame_name, _) = frame_queue.get(False) + frame_manager.delete(frame_name) + + logger.info(f"{self.config.name}: exiting subprocess") def detect( @@ -577,7 +574,6 @@ def detect( def process_frames( - camera_name: str, requestor: InterProcessRequestor, frame_queue: Queue, frame_shape: tuple[int, int], @@ -597,7 +593,7 @@ def process_frames( next_region_update = get_tomorrow_at_time(2) config_subscriber = CameraConfigUpdateSubscriber( None, - {camera_name: camera_config}, + {camera_config.name: camera_config}, [ CameraConfigUpdateEnum.detect, CameraConfigUpdateEnum.enabled, @@ -653,7 +649,9 @@ def process_frames( and prev_enabled != camera_enabled and camera_metrics.frame_queue.empty() ): - logger.debug(f"Camera {camera_name} disabled, clearing tracked objects") + logger.debug( + f"Camera {camera_config.name} disabled, clearing tracked objects" + ) prev_enabled = camera_enabled # Clear norfair's dictionaries @@ -678,7 +676,7 @@ def process_frames( datetime.datetime.now().astimezone(datetime.timezone.utc) > next_region_update ): - region_grid = requestor.send_data(REQUEST_REGION_GRID, camera_name) + region_grid = requestor.send_data(REQUEST_REGION_GRID, camera_config.name) next_region_update = get_tomorrow_at_time(2) try: @@ -698,7 +696,9 @@ def process_frames( frame = frame_manager.get(frame_name, (frame_shape[0] * 3 // 2, frame_shape[1])) if frame is None: - logger.debug(f"{camera_name}: frame {frame_time} is not in memory store.") + logger.debug( + f"{camera_config.name}: frame {frame_time} is not in memory store." + ) continue # look for motion if enabled @@ -937,7 +937,7 @@ def process_frames( ) cv2.imwrite( - f"debug/frames/{camera_name}-{'{:.6f}'.format(frame_time)}.jpg", + f"debug/frames/{camera_config.name}-{'{:.6f}'.format(frame_time)}.jpg", bgr_frame, ) # add to the queue if not full @@ -949,7 +949,7 @@ def process_frames( camera_metrics.process_fps.value = fps_tracker.eps() detected_objects_queue.put( ( - camera_name, + camera_config.name, frame_name, frame_time, detections, diff --git a/web/src/views/system/CameraMetrics.tsx b/web/src/views/system/CameraMetrics.tsx index ba2701926..3f5891265 100644 --- a/web/src/views/system/CameraMetrics.tsx +++ b/web/src/views/system/CameraMetrics.tsx @@ -173,7 +173,7 @@ export default function CameraMetrics({ }); series[key]["detect"].data.push({ x: statsIdx, - y: stats.cpu_usages[camStats.pid.toString()].cpu, + y: stats.cpu_usages[camStats.pid?.toString()]?.cpu, }); }); });