diff --git a/frigate/__main__.py b/frigate/__main__.py index 4c732be80..6dd5d130e 100644 --- a/frigate/__main__.py +++ b/frigate/__main__.py @@ -1,5 +1,6 @@ import argparse import faulthandler +import multiprocessing as mp import signal import sys import threading @@ -15,10 +16,11 @@ from frigate.util.config import find_config_file def main() -> None: + manager = mp.Manager() faulthandler.enable() # Setup the logging thread - setup_logging() + setup_logging(manager) threading.current_thread().name = "frigate" @@ -108,8 +110,9 @@ def main() -> None: sys.exit(0) # Run the main application. - FrigateApp(config).start() + FrigateApp(config, manager).start() if __name__ == "__main__": + mp.set_start_method("forkserver", force=True) main() diff --git a/frigate/app.py b/frigate/app.py index cccbce53e..f78bd561c 100644 --- a/frigate/app.py +++ b/frigate/app.py @@ -5,6 +5,7 @@ import os import secrets import shutil from multiprocessing import Queue +from multiprocessing.managers import DictProxy, SyncManager from multiprocessing.synchronize import Event as MpEvent from typing import Optional @@ -13,7 +14,6 @@ import uvicorn from peewee_migrate import Router from playhouse.sqlite_ext import SqliteExtDatabase -import frigate.util as util from frigate.api.auth import hash_password from frigate.api.fastapi_app import create_fastapi_app from frigate.camera import CameraMetrics, PTZMetrics @@ -23,6 +23,7 @@ from frigate.comms.dispatcher import Dispatcher from frigate.comms.event_metadata_updater import EventMetadataPublisher from frigate.comms.inter_process import InterProcessCommunicator from frigate.comms.mqtt import MqttClient +from frigate.comms.object_detector_signaler import DetectorProxy from frigate.comms.webpush import WebPushClient from frigate.comms.ws import WebSocketClient from frigate.comms.zmq_proxy import ZmqProxy @@ -40,10 +41,11 @@ from frigate.const import ( ) from frigate.data_processing.types import DataProcessorMetrics from frigate.db.sqlitevecq import SqliteVecQueueDatabase -from frigate.embeddings import EmbeddingsContext, manage_embeddings +from frigate.embeddings import EmbeddingProcess, EmbeddingsContext from frigate.events.audio import AudioProcessor from frigate.events.cleanup import EventCleanup from frigate.events.maintainer import EventProcessor +from frigate.log import _stop_logging from frigate.models import ( Event, Export, @@ -56,13 +58,13 @@ from frigate.models import ( User, ) from frigate.object_detection.base import ObjectDetectProcess -from frigate.output.output import output_frames +from frigate.output.output import OutputProcess from frigate.ptz.autotrack import PtzAutoTrackerThread from frigate.ptz.onvif import OnvifController from frigate.record.cleanup import RecordingCleanup from frigate.record.export import migrate_exports -from frigate.record.record import manage_recordings -from frigate.review.review import manage_review_segments +from frigate.record.record import RecordProcess +from frigate.review.review import ReviewProcess from frigate.stats.emitter import StatsEmitter from frigate.stats.util import stats_init from frigate.storage import StorageMaintainer @@ -77,16 +79,19 @@ logger = logging.getLogger(__name__) class FrigateApp: - def __init__(self, config: FrigateConfig) -> None: + def __init__(self, config: FrigateConfig, manager: SyncManager) -> None: + self.metrics_manager = manager self.audio_process: Optional[mp.Process] = None self.stop_event: MpEvent = mp.Event() self.detection_queue: Queue = mp.Queue() self.detectors: dict[str, ObjectDetectProcess] = {} self.detection_shms: list[mp.shared_memory.SharedMemory] = [] self.log_queue: Queue = mp.Queue() - self.camera_metrics: dict[str, CameraMetrics] = {} + self.camera_metrics: DictProxy = self.metrics_manager.dict() self.embeddings_metrics: DataProcessorMetrics | None = ( - DataProcessorMetrics(list(config.classification.custom.keys())) + DataProcessorMetrics( + self.metrics_manager, list(config.classification.custom.keys()) + ) if ( config.semantic_search.enabled or config.lpr.enabled @@ -124,7 +129,7 @@ class FrigateApp: def init_camera_metrics(self) -> None: # create camera_metrics for camera_name in self.config.cameras.keys(): - self.camera_metrics[camera_name] = CameraMetrics() + self.camera_metrics[camera_name] = CameraMetrics(self.metrics_manager) self.ptz_metrics[camera_name] = PTZMetrics( autotracker_enabled=self.config.cameras[ camera_name @@ -218,24 +223,14 @@ class FrigateApp: self.processes["go2rtc"] = proc.info["pid"] def init_recording_manager(self) -> None: - recording_process = util.Process( - target=manage_recordings, - name="recording_manager", - args=(self.config,), - ) - recording_process.daemon = True + recording_process = RecordProcess(self.config) self.recording_process = recording_process recording_process.start() self.processes["recording"] = recording_process.pid or 0 logger.info(f"Recording process started: {recording_process.pid}") def init_review_segment_manager(self) -> None: - review_segment_process = util.Process( - target=manage_review_segments, - name="review_segment_manager", - args=(self.config,), - ) - review_segment_process.daemon = True + review_segment_process = ReviewProcess(self.config) self.review_segment_process = review_segment_process review_segment_process.start() self.processes["review_segment"] = review_segment_process.pid or 0 @@ -254,15 +249,10 @@ class FrigateApp: ): return - embedding_process = util.Process( - target=manage_embeddings, - name="embeddings_manager", - args=( - self.config, - self.embeddings_metrics, - ), + embedding_process = EmbeddingProcess( + self.config, + self.embeddings_metrics, ) - embedding_process.daemon = True self.embedding_process = embedding_process embedding_process.start() self.processes["embeddings"] = embedding_process.pid or 0 @@ -330,6 +320,7 @@ class FrigateApp: self.inter_config_updater = CameraConfigUpdatePublisher() self.event_metadata_updater = EventMetadataPublisher() self.inter_zmq_proxy = ZmqProxy() + self.detection_proxy = DetectorProxy() def init_onvif(self) -> None: self.onvif_controller = OnvifController(self.config, self.ptz_metrics) @@ -418,12 +409,7 @@ class FrigateApp: self.detected_frames_processor.start() def start_video_output_processor(self) -> None: - output_processor = util.Process( - target=output_frames, - name="output_processor", - args=(self.config,), - ) - output_processor.daemon = True + output_processor = OutputProcess(self.config) self.output_processor = output_processor output_processor.start() logger.info(f"Output process started: {output_processor.pid}") @@ -554,11 +540,11 @@ class FrigateApp: self.init_recording_manager() self.init_review_segment_manager() self.init_go2rtc() - self.start_detectors() self.init_embeddings_manager() self.bind_database() self.check_db_data_migrations() self.init_inter_process_communicator() + self.start_detectors() self.init_dispatcher() self.init_embeddings_client() self.start_video_output_processor() @@ -661,10 +647,13 @@ class FrigateApp: self.inter_config_updater.stop() self.event_metadata_updater.stop() self.inter_zmq_proxy.stop() + self.detection_proxy.stop() while len(self.detection_shms) > 0: shm = self.detection_shms.pop() shm.close() shm.unlink() + _stop_logging() + self.metrics_manager.shutdown() os._exit(os.EX_OK) diff --git a/frigate/camera/__init__.py b/frigate/camera/__init__.py index 456751c52..77b1fd424 100644 --- a/frigate/camera/__init__.py +++ b/frigate/camera/__init__.py @@ -1,7 +1,7 @@ import multiprocessing as mp +from multiprocessing.managers import SyncManager from multiprocessing.sharedctypes import Synchronized from multiprocessing.synchronize import Event -from typing import Optional class CameraMetrics: @@ -16,25 +16,25 @@ class CameraMetrics: frame_queue: mp.Queue - process: Optional[mp.Process] - capture_process: Optional[mp.Process] + process_pid: Synchronized + capture_process_pid: Synchronized ffmpeg_pid: Synchronized - def __init__(self): - self.camera_fps = mp.Value("d", 0) - self.detection_fps = mp.Value("d", 0) - self.detection_frame = mp.Value("d", 0) - self.process_fps = mp.Value("d", 0) - self.skipped_fps = mp.Value("d", 0) - self.read_start = mp.Value("d", 0) - self.audio_rms = mp.Value("d", 0) - self.audio_dBFS = mp.Value("d", 0) + def __init__(self, manager: SyncManager): + self.camera_fps = manager.Value("d", 0) + self.detection_fps = manager.Value("d", 0) + self.detection_frame = manager.Value("d", 0) + self.process_fps = manager.Value("d", 0) + self.skipped_fps = manager.Value("d", 0) + self.read_start = manager.Value("d", 0) + self.audio_rms = manager.Value("d", 0) + self.audio_dBFS = manager.Value("d", 0) - self.frame_queue = mp.Queue(maxsize=2) + self.frame_queue = manager.Queue(maxsize=2) - self.process = None - self.capture_process = None - self.ffmpeg_pid = mp.Value("i", 0) + self.process_pid = manager.Value("i", 0) + self.capture_process_pid = manager.Value("i", 0) + self.ffmpeg_pid = manager.Value("i", 0) class PTZMetrics: diff --git a/frigate/camera/maintainer.py b/frigate/camera/maintainer.py index 6abeb762e..dd978bbfc 100644 --- a/frigate/camera/maintainer.py +++ b/frigate/camera/maintainer.py @@ -1,10 +1,12 @@ """Create and maintain camera processes / management.""" import logging +import multiprocessing as mp import os import shutil import threading from multiprocessing import Queue +from multiprocessing.managers import DictProxy from multiprocessing.synchronize import Event as MpEvent from frigate.camera import CameraMetrics, PTZMetrics @@ -16,11 +18,10 @@ from frigate.config.camera.updater import ( ) from frigate.const import SHM_FRAMES_VAR from frigate.models import Regions -from frigate.util import Process as FrigateProcess from frigate.util.builtin import empty_and_close_queue from frigate.util.image import SharedMemoryFrameManager, UntrackedSharedMemory from frigate.util.object import get_camera_regions_grid -from frigate.video import capture_camera, track_camera +from frigate.video import CameraCapture, CameraTracker logger = logging.getLogger(__name__) @@ -31,7 +32,7 @@ class CameraMaintainer(threading.Thread): config: FrigateConfig, detection_queue: Queue, detected_frames_queue: Queue, - camera_metrics: dict[str, CameraMetrics], + camera_metrics: DictProxy, ptz_metrics: dict[str, PTZMetrics], stop_event: MpEvent, ): @@ -53,6 +54,8 @@ class CameraMaintainer(threading.Thread): ], ) self.shm_count = self.__calculate_shm_frame_count() + self.camera_processes: dict[str, mp.Process] = {} + self.capture_processes: dict[str, mp.Process] = {} def __init_historical_regions(self) -> None: # delete region grids for removed or renamed cameras @@ -94,7 +97,7 @@ class CameraMaintainer(threading.Thread): # leave room for 2 cameras that are added dynamically, if a user wants to add more cameras they may need to increase the SHM size and restart after adding them. cam_total_frame_size += 2 * round( - (camera.detect.width * camera.detect.height * 1.5 + 270480) / 1048576, + (1280 * 720 * 1.5 + 270480) / 1048576, 1, ) @@ -151,24 +154,19 @@ class CameraMaintainer(threading.Thread): except FileExistsError: pass - camera_process = FrigateProcess( - target=track_camera, - name=f"camera_processor:{name}", - args=( - config.name, - config, - self.config.model, - self.config.model.merged_labelmap, - self.detection_queue, - self.detected_frames_queue, - self.camera_metrics[name], - self.ptz_metrics[name], - self.region_grids[name], - ), - daemon=True, + camera_process = CameraTracker( + config, + self.config.model, + self.config.model.merged_labelmap, + self.detection_queue, + self.detected_frames_queue, + self.camera_metrics[name], + self.ptz_metrics[name], + self.region_grids[name], ) - self.camera_metrics[config.name].process = camera_process + self.camera_processes[config.name] = camera_process camera_process.start() + self.camera_metrics[config.name].process_pid.value = camera_process.pid logger.info(f"Camera processor started for {config.name}: {camera_process.pid}") def __start_camera_capture( @@ -179,36 +177,33 @@ class CameraMaintainer(threading.Thread): return # pre-create shms - for i in range(10 if runtime else self.shm_count): + count = 10 if runtime else self.shm_count + for i in range(count): frame_size = config.frame_shape_yuv[0] * config.frame_shape_yuv[1] self.frame_manager.create(f"{config.name}_frame{i}", frame_size) - capture_process = FrigateProcess( - target=capture_camera, - name=f"camera_capture:{name}", - args=(config, self.shm_count, self.camera_metrics[name]), - ) + capture_process = CameraCapture(config, count, self.camera_metrics[name]) capture_process.daemon = True - self.camera_metrics[name].capture_process = capture_process + self.capture_processes[name] = capture_process capture_process.start() + self.camera_metrics[name].capture_process_pid.value = capture_process.pid logger.info(f"Capture process started for {name}: {capture_process.pid}") def __stop_camera_capture_process(self, camera: str) -> None: - capture_process = self.camera_metrics[camera].capture_process + capture_process = self.capture_processes[camera] if capture_process is not None: logger.info(f"Waiting for capture process for {camera} to stop") capture_process.terminate() capture_process.join() def __stop_camera_process(self, camera: str) -> None: - metrics = self.camera_metrics[camera] - camera_process = metrics.process + camera_process = self.camera_processes[camera] if camera_process is not None: logger.info(f"Waiting for process for {camera} to stop") camera_process.terminate() camera_process.join() logger.info(f"Closing frame queue for {camera}") - empty_and_close_queue(metrics.frame_queue) + empty_and_close_queue(self.camera_metrics[camera].frame_queue) def run(self): self.__init_historical_regions() @@ -230,18 +225,20 @@ class CameraMaintainer(threading.Thread): runtime=True, ) self.__start_camera_capture( - camera, self.update_subscriber.camera_configs[camera] + camera, + self.update_subscriber.camera_configs[camera], + runtime=True, ) elif update_type == CameraConfigUpdateEnum.remove.name: self.__stop_camera_capture_process(camera) self.__stop_camera_process(camera) # ensure the capture processes are done - for camera in self.camera_metrics.keys(): + for camera in self.camera_processes.keys(): self.__stop_camera_capture_process(camera) # ensure the camera processors are done - for camera in self.camera_metrics.keys(): + for camera in self.capture_processes.keys(): self.__stop_camera_process(camera) self.update_subscriber.stop() diff --git a/frigate/comms/object_detector_signaler.py b/frigate/comms/object_detector_signaler.py index befc83e4d..e8871db1a 100644 --- a/frigate/comms/object_detector_signaler.py +++ b/frigate/comms/object_detector_signaler.py @@ -1,21 +1,92 @@ """Facilitates communication between processes for object detection signals.""" -from .zmq_proxy import Publisher, Subscriber +import threading + +import zmq + +SOCKET_PUB = "ipc:///tmp/cache/detector_pub" +SOCKET_SUB = "ipc:///tmp/cache/detector_sub" -class ObjectDetectorPublisher(Publisher): +class ZmqProxyRunner(threading.Thread): + def __init__(self, context: zmq.Context[zmq.Socket]) -> None: + super().__init__(name="detector_proxy") + self.context = context + + def run(self) -> None: + """Run the proxy.""" + incoming = self.context.socket(zmq.XSUB) + incoming.bind(SOCKET_PUB) + outgoing = self.context.socket(zmq.XPUB) + outgoing.bind(SOCKET_SUB) + + # Blocking: This will unblock (via exception) when we destroy the context + # The incoming and outgoing sockets will be closed automatically + # when the context is destroyed as well. + try: + zmq.proxy(incoming, outgoing) + except zmq.ZMQError: + pass + + +class DetectorProxy: + """Proxies object detection signals.""" + + def __init__(self) -> None: + self.context = zmq.Context() + self.runner = ZmqProxyRunner(self.context) + self.runner.start() + + def stop(self) -> None: + # destroying the context will tell the proxy to stop + self.context.destroy() + self.runner.join() + + +class ObjectDetectorPublisher: """Publishes signal for object detection to different processes.""" topic_base = "object_detector/" + def __init__(self, topic: str = "") -> None: + self.topic = f"{self.topic_base}{topic}" + self.context = zmq.Context() + self.socket = self.context.socket(zmq.PUB) + self.socket.connect(SOCKET_PUB) -class ObjectDetectorSubscriber(Subscriber): + def publish(self, sub_topic: str = "") -> None: + """Publish message.""" + self.socket.send_string(f"{self.topic}{sub_topic}/") + + def stop(self) -> None: + self.socket.close() + self.context.destroy() + + +class ObjectDetectorSubscriber: """Simplifies receiving a signal for object detection.""" topic_base = "object_detector/" - def __init__(self, topic: str) -> None: - super().__init__(topic) + def __init__(self, topic: str = "") -> None: + self.topic = f"{self.topic_base}{topic}/" + self.context = zmq.Context() + self.socket = self.context.socket(zmq.SUB) + self.socket.setsockopt_string(zmq.SUBSCRIBE, self.topic) + self.socket.connect(SOCKET_SUB) - def check_for_update(self): - return super().check_for_update(timeout=5) + def check_for_update(self, timeout: float = 5) -> str | None: + """Returns message or None if no update.""" + try: + has_update, _, _ = zmq.select([self.socket], [], [], timeout) + + if has_update: + return self.socket.recv_string(flags=zmq.NOBLOCK) + except zmq.ZMQError: + pass + + return None + + def stop(self) -> None: + self.socket.close() + self.context.destroy() diff --git a/frigate/data_processing/types.py b/frigate/data_processing/types.py index 50f1ed561..d18a1175a 100644 --- a/frigate/data_processing/types.py +++ b/frigate/data_processing/types.py @@ -1,7 +1,7 @@ """Embeddings types.""" -import multiprocessing as mp from enum import Enum +from multiprocessing.managers import SyncManager from multiprocessing.sharedctypes import Synchronized import sherpa_onnx @@ -20,25 +20,27 @@ class DataProcessorMetrics: alpr_pps: Synchronized yolov9_lpr_speed: Synchronized yolov9_lpr_pps: Synchronized - classification_speeds: dict[str, Synchronized] = {} - classification_cps: dict[str, Synchronized] = {} + classification_speeds: dict[str, Synchronized] + classification_cps: dict[str, Synchronized] - def __init__(self, custom_classification_models: list[str]): - self.image_embeddings_speed = mp.Value("d", 0.0) - self.image_embeddings_eps = mp.Value("d", 0.0) - self.text_embeddings_speed = mp.Value("d", 0.0) - self.text_embeddings_eps = mp.Value("d", 0.0) - self.face_rec_speed = mp.Value("d", 0.0) - self.face_rec_fps = mp.Value("d", 0.0) - self.alpr_speed = mp.Value("d", 0.0) - self.alpr_pps = mp.Value("d", 0.0) - self.yolov9_lpr_speed = mp.Value("d", 0.0) - self.yolov9_lpr_pps = mp.Value("d", 0.0) + def __init__(self, manager: SyncManager, custom_classification_models: list[str]): + self.image_embeddings_speed = manager.Value("d", 0.0) + self.image_embeddings_eps = manager.Value("d", 0.0) + self.text_embeddings_speed = manager.Value("d", 0.0) + self.text_embeddings_eps = manager.Value("d", 0.0) + self.face_rec_speed = manager.Value("d", 0.0) + self.face_rec_fps = manager.Value("d", 0.0) + self.alpr_speed = manager.Value("d", 0.0) + self.alpr_pps = manager.Value("d", 0.0) + self.yolov9_lpr_speed = manager.Value("d", 0.0) + self.yolov9_lpr_pps = manager.Value("d", 0.0) + self.classification_speeds = manager.dict() + self.classification_cps = manager.dict() if custom_classification_models: for key in custom_classification_models: - self.classification_speeds[key] = mp.Value("d", 0.0) - self.classification_cps[key] = mp.Value("d", 0.0) + self.classification_speeds[key] = manager.Value("d", 0.0) + self.classification_cps[key] = manager.Value("d", 0.0) class DataProcessorModelRunner: diff --git a/frigate/embeddings/__init__.py b/frigate/embeddings/__init__.py index 80832369c..9870c9460 100644 --- a/frigate/embeddings/__init__.py +++ b/frigate/embeddings/__init__.py @@ -3,26 +3,22 @@ import base64 import json import logging -import multiprocessing as mp import os -import signal import threading -from types import FrameType -from typing import Any, Optional, Union +from typing import Any, Union import regex from pathvalidate import ValidationError, sanitize_filename -from setproctitle import setproctitle from frigate.comms.embeddings_updater import EmbeddingsRequestEnum, EmbeddingsRequestor from frigate.config import FrigateConfig from frigate.const import CONFIG_DIR, FACE_DIR from frigate.data_processing.types import DataProcessorMetrics from frigate.db.sqlitevecq import SqliteVecQueueDatabase -from frigate.models import Event, Recordings +from frigate.models import Event +from frigate.util import Process as FrigateProcess from frigate.util.builtin import serialize from frigate.util.classification import kickoff_model_training -from frigate.util.services import listen from .maintainer import EmbeddingMaintainer from .util import ZScoreNormalization @@ -30,40 +26,22 @@ from .util import ZScoreNormalization logger = logging.getLogger(__name__) -def manage_embeddings(config: FrigateConfig, metrics: DataProcessorMetrics) -> None: - stop_event = mp.Event() +class EmbeddingProcess(FrigateProcess): + def __init__( + self, config: FrigateConfig, metrics: DataProcessorMetrics | None + ) -> None: + super().__init__(name="frigate.embeddings_manager", daemon=True) + self.config = config + self.metrics = metrics - def receiveSignal(signalNumber: int, frame: Optional[FrameType]) -> None: - stop_event.set() - - signal.signal(signal.SIGTERM, receiveSignal) - signal.signal(signal.SIGINT, receiveSignal) - - threading.current_thread().name = "process:embeddings_manager" - setproctitle("frigate.embeddings_manager") - listen() - - # Configure Frigate DB - db = SqliteVecQueueDatabase( - config.database.path, - pragmas={ - "auto_vacuum": "FULL", # Does not defragment database - "cache_size": -512 * 1000, # 512MB of cache - "synchronous": "NORMAL", # Safe when using WAL https://www.sqlite.org/pragma.html#pragma_synchronous - }, - timeout=max(60, 10 * len([c for c in config.cameras.values() if c.enabled])), - load_vec_extension=True, - ) - models = [Event, Recordings] - db.bind(models) - - maintainer = EmbeddingMaintainer( - db, - config, - metrics, - stop_event, - ) - maintainer.start() + def run(self) -> None: + self.pre_run_setup() + maintainer = EmbeddingMaintainer( + self.config, + self.metrics, + self.stop_event, + ) + maintainer.start() class EmbeddingsContext: diff --git a/frigate/embeddings/maintainer.py b/frigate/embeddings/maintainer.py index 0980a8ae8..c659d04fe 100644 --- a/frigate/embeddings/maintainer.py +++ b/frigate/embeddings/maintainer.py @@ -12,7 +12,6 @@ from typing import Any, Optional import cv2 import numpy as np from peewee import DoesNotExist -from playhouse.sqliteq import SqliteQueueDatabase from frigate.comms.detections_updater import DetectionSubscriber, DetectionTypeEnum from frigate.comms.embeddings_updater import EmbeddingsRequestEnum, EmbeddingsResponder @@ -58,9 +57,10 @@ from frigate.data_processing.real_time.license_plate import ( LicensePlateRealTimeProcessor, ) from frigate.data_processing.types import DataProcessorMetrics, PostProcessDataEnum +from frigate.db.sqlitevecq import SqliteVecQueueDatabase from frigate.events.types import EventTypeEnum, RegenerateDescriptionEnum from frigate.genai import get_genai_client -from frigate.models import Event +from frigate.models import Event, Recordings from frigate.types import TrackedObjectUpdateTypesEnum from frigate.util.builtin import serialize from frigate.util.image import ( @@ -82,9 +82,8 @@ class EmbeddingMaintainer(threading.Thread): def __init__( self, - db: SqliteQueueDatabase, config: FrigateConfig, - metrics: DataProcessorMetrics, + metrics: DataProcessorMetrics | None, stop_event: MpEvent, ) -> None: super().__init__(name="embeddings_maintainer") @@ -97,6 +96,22 @@ class EmbeddingMaintainer(threading.Thread): [CameraConfigUpdateEnum.add, CameraConfigUpdateEnum.remove], ) + # Configure Frigate DB + db = SqliteVecQueueDatabase( + config.database.path, + pragmas={ + "auto_vacuum": "FULL", # Does not defragment database + "cache_size": -512 * 1000, # 512MB of cache + "synchronous": "NORMAL", # Safe when using WAL https://www.sqlite.org/pragma.html#pragma_synchronous + }, + timeout=max( + 60, 10 * len([c for c in config.cameras.values() if c.enabled]) + ), + load_vec_extension=True, + ) + models = [Event, Recordings] + db.bind(models) + if config.semantic_search.enabled: self.embeddings = Embeddings(config, db, metrics) diff --git a/frigate/events/audio.py b/frigate/events/audio.py index 797a767ba..9152428fa 100644 --- a/frigate/events/audio.py +++ b/frigate/events/audio.py @@ -6,12 +6,12 @@ import random import string import threading import time +from multiprocessing.managers import DictProxy from typing import Any, Tuple import numpy as np import frigate.util as util -from frigate.camera import CameraMetrics from frigate.comms.detections_updater import DetectionPublisher, DetectionTypeEnum from frigate.comms.event_metadata_updater import ( EventMetadataPublisher, @@ -83,7 +83,7 @@ class AudioProcessor(util.Process): self, config: FrigateConfig, cameras: list[CameraConfig], - camera_metrics: dict[str, CameraMetrics], + camera_metrics: DictProxy, ): super().__init__(name="frigate.audio_manager", daemon=True) @@ -93,7 +93,7 @@ class AudioProcessor(util.Process): if any( [ - conf.audio_transcription.enabled_in_config + conf.audio_transcription.enabled_in_config == True for conf in config.cameras.values() ] ): @@ -105,6 +105,7 @@ class AudioProcessor(util.Process): self.transcription_model_runner = None def run(self) -> None: + self.pre_run_setup() audio_threads: list[AudioEventMaintainer] = [] threading.current_thread().name = "process:audio_manager" @@ -146,7 +147,7 @@ class AudioEventMaintainer(threading.Thread): self, camera: CameraConfig, config: FrigateConfig, - camera_metrics: dict[str, CameraMetrics], + camera_metrics: DictProxy, audio_transcription_model_runner: AudioTranscriptionModelRunner | None, stop_event: threading.Event, ) -> None: diff --git a/frigate/log.py b/frigate/log.py index 53e9004f5..f535a278c 100644 --- a/frigate/log.py +++ b/frigate/log.py @@ -1,11 +1,13 @@ +# In log.py import atexit import logging -import multiprocessing as mp import os import sys import threading from collections import deque from logging.handlers import QueueHandler, QueueListener +from multiprocessing.managers import SyncManager +from queue import Queue from typing import Deque, Optional from frigate.util.builtin import clean_camera_user_pass @@ -32,12 +34,12 @@ LOG_HANDLER.addFilter( ) log_listener: Optional[QueueListener] = None +log_queue: Optional[Queue] = None -def setup_logging() -> None: - global log_listener - - log_queue: mp.Queue = mp.Queue() +def setup_logging(manager: SyncManager) -> None: + global log_listener, log_queue + log_queue = manager.Queue() log_listener = QueueListener(log_queue, LOG_HANDLER, respect_handler_level=True) atexit.register(_stop_logging) @@ -54,7 +56,6 @@ def setup_logging() -> None: def _stop_logging() -> None: global log_listener - if log_listener is not None: log_listener.stop() log_listener = None diff --git a/frigate/object_detection/base.py b/frigate/object_detection/base.py index 86febc6a7..d203e8574 100644 --- a/frigate/object_detection/base.py +++ b/frigate/object_detection/base.py @@ -1,16 +1,11 @@ import datetime import logging -import multiprocessing as mp -import os import queue -import signal -import threading from abc import ABC, abstractmethod from multiprocessing import Queue, Value from multiprocessing.synchronize import Event as MpEvent import numpy as np -from setproctitle import setproctitle import frigate.util as util from frigate.comms.object_detector_signaler import ( @@ -25,7 +20,6 @@ from frigate.detectors.detector_config import ( ) from frigate.util.builtin import EventsPerSecond, load_labels from frigate.util.image import SharedMemoryFrameManager, UntrackedSharedMemory -from frigate.util.services import listen from .util import tensor_transform @@ -90,73 +84,75 @@ class LocalObjectDetector(ObjectDetector): return self.detect_api.detect_raw(tensor_input=tensor_input) -def run_detector( - name: str, - detection_queue: Queue, - cameras: list[str], - avg_speed: Value, - start: Value, - detector_config: BaseDetectorConfig, -): - threading.current_thread().name = f"detector:{name}" - logger = logging.getLogger(f"detector.{name}") - logger.info(f"Starting detection process: {os.getpid()}") - setproctitle(f"frigate.detector.{name}") - listen() +class DetectorRunner(util.Process): + def __init__( + self, + name, + detection_queue: Queue, + cameras: list[str], + avg_speed: Value, + start_time: Value, + detector_config: BaseDetectorConfig, + ) -> None: + super().__init__(name=name, daemon=True) + self.detection_queue = detection_queue + self.cameras = cameras + self.avg_speed = avg_speed + self.start_time = start_time + self.detector_config = detector_config + self.outputs: dict = {} - stop_event: MpEvent = mp.Event() - - def receiveSignal(signalNumber, frame): - stop_event.set() - - signal.signal(signal.SIGTERM, receiveSignal) - signal.signal(signal.SIGINT, receiveSignal) - - def create_output_shm(name: str): + def create_output_shm(self, name: str): out_shm = UntrackedSharedMemory(name=f"out-{name}", create=False) out_np = np.ndarray((20, 6), dtype=np.float32, buffer=out_shm.buf) - outputs[name] = {"shm": out_shm, "np": out_np} + self.outputs[name] = {"shm": out_shm, "np": out_np} - frame_manager = SharedMemoryFrameManager() - object_detector = LocalObjectDetector(detector_config=detector_config) - detector_publisher = ObjectDetectorPublisher() + def run(self) -> None: + self.pre_run_setup() - outputs = {} - for name in cameras: - create_output_shm(name) + frame_manager = SharedMemoryFrameManager() + object_detector = LocalObjectDetector(detector_config=self.detector_config) + detector_publisher = ObjectDetectorPublisher() - while not stop_event.is_set(): - try: - connection_id = detection_queue.get(timeout=1) - except queue.Empty: - continue - input_frame = frame_manager.get( - connection_id, - (1, detector_config.model.height, detector_config.model.width, 3), - ) + for name in self.cameras: + self.create_output_shm(name) - if input_frame is None: - logger.warning(f"Failed to get frame {connection_id} from SHM") - continue + while not self.stop_event.is_set(): + try: + connection_id = self.detection_queue.get(timeout=1) + except queue.Empty: + continue + input_frame = frame_manager.get( + connection_id, + ( + 1, + self.detector_config.model.height, + self.detector_config.model.width, + 3, + ), + ) - # detect and send the output - start.value = datetime.datetime.now().timestamp() - detections = object_detector.detect_raw(input_frame) - duration = datetime.datetime.now().timestamp() - start.value - frame_manager.close(connection_id) + if input_frame is None: + logger.warning(f"Failed to get frame {connection_id} from SHM") + continue - if connection_id not in outputs: - create_output_shm(connection_id) + # detect and send the output + self.start_time.value = datetime.datetime.now().timestamp() + detections = object_detector.detect_raw(input_frame) + duration = datetime.datetime.now().timestamp() - self.start_time.value + frame_manager.close(connection_id) - outputs[connection_id]["np"][:] = detections[:] - signal_id = f"{connection_id}/update" - detector_publisher.publish(signal_id, signal_id) - start.value = 0.0 + if connection_id not in self.outputs: + self.create_output_shm(connection_id) - avg_speed.value = (avg_speed.value * 9 + duration) / 10 + self.outputs[connection_id]["np"][:] = detections[:] + detector_publisher.publish(connection_id) + self.start_time.value = 0.0 - detector_publisher.stop() - logger.info("Exited detection process...") + self.avg_speed.value = (self.avg_speed.value * 9 + duration) / 10 + + detector_publisher.stop() + logger.info("Exited detection process...") class ObjectDetectProcess: @@ -193,19 +189,14 @@ class ObjectDetectProcess: self.detection_start.value = 0.0 if (self.detect_process is not None) and self.detect_process.is_alive(): self.stop() - self.detect_process = util.Process( - target=run_detector, - name=f"detector:{self.name}", - args=( - self.name, - self.detection_queue, - self.cameras, - self.avg_inference_speed, - self.detection_start, - self.detector_config, - ), + self.detect_process = DetectorRunner( + f"detector:{self.name}", + self.detection_queue, + self.cameras, + self.avg_inference_speed, + self.detection_start, + self.detector_config, ) - self.detect_process.daemon = True self.detect_process.start() @@ -231,7 +222,7 @@ class RemoteObjectDetector: ) self.out_shm = UntrackedSharedMemory(name=f"out-{self.name}", create=False) self.out_np_shm = np.ndarray((20, 6), dtype=np.float32, buffer=self.out_shm.buf) - self.detector_subscriber = ObjectDetectorSubscriber(f"{name}/update") + self.detector_subscriber = ObjectDetectorSubscriber(name) def detect(self, tensor_input, threshold=0.4): detections = [] diff --git a/frigate/output/output.py b/frigate/output/output.py index d323596fe..8c60e51c7 100644 --- a/frigate/output/output.py +++ b/frigate/output/output.py @@ -2,14 +2,11 @@ import datetime import logging -import multiprocessing as mp import os import shutil -import signal import threading from wsgiref.simple_server import make_server -from setproctitle import setproctitle from ws4py.server.wsgirefserver import ( WebSocketWSGIHandler, WebSocketWSGIRequestHandler, @@ -17,6 +14,7 @@ from ws4py.server.wsgirefserver import ( ) from ws4py.server.wsgiutils import WebSocketWSGIApplication +import frigate.util as util from frigate.comms.detections_updater import DetectionSubscriber, DetectionTypeEnum from frigate.comms.ws import WebSocket from frigate.config import FrigateConfig @@ -73,189 +71,193 @@ def check_disabled_camera_update( birdseye.all_cameras_disabled() -def output_frames( - config: FrigateConfig, -): - threading.current_thread().name = "output" - setproctitle("frigate.output") +class OutputProcess(util.Process): + def __init__(self, config: FrigateConfig) -> None: + super().__init__(name="frigate.output", daemon=True) + self.config = config - stop_event = mp.Event() + def run(self) -> None: + self.pre_run_setup() - def receiveSignal(signalNumber, frame): - stop_event.set() + frame_manager = SharedMemoryFrameManager() - signal.signal(signal.SIGTERM, receiveSignal) - signal.signal(signal.SIGINT, receiveSignal) - - frame_manager = SharedMemoryFrameManager() - - # start a websocket server on 8082 - WebSocketWSGIHandler.http_version = "1.1" - websocket_server = make_server( - "127.0.0.1", - 8082, - server_class=WSGIServer, - handler_class=WebSocketWSGIRequestHandler, - app=WebSocketWSGIApplication(handler_cls=WebSocket), - ) - websocket_server.initialize_websockets_manager() - websocket_thread = threading.Thread(target=websocket_server.serve_forever) - - detection_subscriber = DetectionSubscriber(DetectionTypeEnum.video) - config_subscriber = CameraConfigUpdateSubscriber( - config, - config.cameras, - [ - CameraConfigUpdateEnum.add, - CameraConfigUpdateEnum.birdseye, - CameraConfigUpdateEnum.enabled, - CameraConfigUpdateEnum.record, - ], - ) - - jsmpeg_cameras: dict[str, JsmpegCamera] = {} - birdseye: Birdseye | None = None - preview_recorders: dict[str, PreviewRecorder] = {} - preview_write_times: dict[str, float] = {} - failed_frame_requests: dict[str, int] = {} - last_disabled_cam_check = datetime.datetime.now().timestamp() - - move_preview_frames("cache") - - for camera, cam_config in config.cameras.items(): - if not cam_config.enabled_in_config: - continue - - jsmpeg_cameras[camera] = JsmpegCamera(cam_config, stop_event, websocket_server) - preview_recorders[camera] = PreviewRecorder(cam_config) - preview_write_times[camera] = 0 - - if config.birdseye.enabled: - birdseye = Birdseye(config, stop_event, websocket_server) - - websocket_thread.start() - - while not stop_event.is_set(): - # check if there is an updated config - updates = config_subscriber.check_for_updates() - - if "add" in updates: - for camera in updates["add"]: - jsmpeg_cameras[camera] = JsmpegCamera( - cam_config, stop_event, websocket_server - ) - preview_recorders[camera] = PreviewRecorder(cam_config) - preview_write_times[camera] = 0 - - (topic, data) = detection_subscriber.check_for_update(timeout=1) - now = datetime.datetime.now().timestamp() - - if now - last_disabled_cam_check > 5: - # check disabled cameras every 5 seconds - last_disabled_cam_check = now - check_disabled_camera_update( - config, birdseye, preview_recorders, preview_write_times - ) - - if not topic: - continue - - ( - camera, - frame_name, - frame_time, - current_tracked_objects, - motion_boxes, - _, - ) = data - - if not config.cameras[camera].enabled: - continue - - frame = frame_manager.get(frame_name, config.cameras[camera].frame_shape_yuv) - - if frame is None: - logger.debug(f"Failed to get frame {frame_name} from SHM") - failed_frame_requests[camera] = failed_frame_requests.get(camera, 0) + 1 - - if failed_frame_requests[camera] > config.cameras[camera].detect.fps: - logger.warning( - f"Failed to retrieve many frames for {camera} from SHM, consider increasing SHM size if this continues." - ) - - continue - else: - failed_frame_requests[camera] = 0 - - # send frames for low fps recording - preview_recorders[camera].write_data( - current_tracked_objects, motion_boxes, frame_time, frame + # start a websocket server on 8082 + WebSocketWSGIHandler.http_version = "1.1" + websocket_server = make_server( + "127.0.0.1", + 8082, + server_class=WSGIServer, + handler_class=WebSocketWSGIRequestHandler, + app=WebSocketWSGIApplication(handler_cls=WebSocket), ) - preview_write_times[camera] = frame_time + websocket_server.initialize_websockets_manager() + websocket_thread = threading.Thread(target=websocket_server.serve_forever) - # send camera frame to ffmpeg process if websockets are connected - if any( - ws.environ["PATH_INFO"].endswith(camera) for ws in websocket_server.manager - ): - # write to the converter for the camera if clients are listening to the specific camera - jsmpeg_cameras[camera].write_frame(frame.tobytes()) + detection_subscriber = DetectionSubscriber(DetectionTypeEnum.video) + config_subscriber = CameraConfigUpdateSubscriber( + self.config, + self.config.cameras, + [ + CameraConfigUpdateEnum.add, + CameraConfigUpdateEnum.birdseye, + CameraConfigUpdateEnum.enabled, + CameraConfigUpdateEnum.record, + ], + ) - # send output data to birdseye if websocket is connected or restreaming - if config.birdseye.enabled and ( - config.birdseye.restream - or any( - ws.environ["PATH_INFO"].endswith("birdseye") - for ws in websocket_server.manager + jsmpeg_cameras: dict[str, JsmpegCamera] = {} + birdseye: Birdseye | None = None + preview_recorders: dict[str, PreviewRecorder] = {} + preview_write_times: dict[str, float] = {} + failed_frame_requests: dict[str, int] = {} + last_disabled_cam_check = datetime.datetime.now().timestamp() + + move_preview_frames("cache") + + for camera, cam_config in self.config.cameras.items(): + if not cam_config.enabled_in_config: + continue + + jsmpeg_cameras[camera] = JsmpegCamera( + cam_config, self.stop_event, websocket_server ) - ): - birdseye.write_data( + preview_recorders[camera] = PreviewRecorder(cam_config) + preview_write_times[camera] = 0 + + if self.config.birdseye.enabled: + birdseye = Birdseye(self.config, self.stop_event, websocket_server) + + websocket_thread.start() + + while not self.stop_event.is_set(): + # check if there is an updated config + updates = config_subscriber.check_for_updates() + + if "add" in updates: + for camera in updates["add"]: + jsmpeg_cameras[camera] = JsmpegCamera( + cam_config, self.stop_event, websocket_server + ) + preview_recorders[camera] = PreviewRecorder(cam_config) + preview_write_times[camera] = 0 + + (topic, data) = detection_subscriber.check_for_update(timeout=1) + now = datetime.datetime.now().timestamp() + + if now - last_disabled_cam_check > 5: + # check disabled cameras every 5 seconds + last_disabled_cam_check = now + check_disabled_camera_update( + self.config, birdseye, preview_recorders, preview_write_times + ) + + if not topic: + continue + + ( camera, + frame_name, + frame_time, current_tracked_objects, motion_boxes, - frame_time, - frame, + _, + ) = data + + if not self.config.cameras[camera].enabled: + continue + + frame = frame_manager.get( + frame_name, self.config.cameras[camera].frame_shape_yuv ) - frame_manager.close(frame_name) + if frame is None: + logger.debug(f"Failed to get frame {frame_name} from SHM") + failed_frame_requests[camera] = failed_frame_requests.get(camera, 0) + 1 - move_preview_frames("clips") + if ( + failed_frame_requests[camera] + > self.config.cameras[camera].detect.fps + ): + logger.warning( + f"Failed to retrieve many frames for {camera} from SHM, consider increasing SHM size if this continues." + ) - while True: - (topic, data) = detection_subscriber.check_for_update(timeout=0) + continue + else: + failed_frame_requests[camera] = 0 - if not topic: - break + # send frames for low fps recording + preview_recorders[camera].write_data( + current_tracked_objects, motion_boxes, frame_time, frame + ) + preview_write_times[camera] = frame_time - ( - camera, - frame_name, - frame_time, - current_tracked_objects, - motion_boxes, - regions, - ) = data + # send camera frame to ffmpeg process if websockets are connected + if any( + ws.environ["PATH_INFO"].endswith(camera) + for ws in websocket_server.manager + ): + # write to the converter for the camera if clients are listening to the specific camera + jsmpeg_cameras[camera].write_frame(frame.tobytes()) - frame = frame_manager.get(frame_name, config.cameras[camera].frame_shape_yuv) - frame_manager.close(frame_name) + # send output data to birdseye if websocket is connected or restreaming + if self.config.birdseye.enabled and ( + self.config.birdseye.restream + or any( + ws.environ["PATH_INFO"].endswith("birdseye") + for ws in websocket_server.manager + ) + ): + birdseye.write_data( + camera, + current_tracked_objects, + motion_boxes, + frame_time, + frame, + ) - detection_subscriber.stop() + frame_manager.close(frame_name) - for jsmpeg in jsmpeg_cameras.values(): - jsmpeg.stop() + move_preview_frames("clips") - for preview in preview_recorders.values(): - preview.stop() + while True: + (topic, data) = detection_subscriber.check_for_update(timeout=0) - if birdseye is not None: - birdseye.stop() + if not topic: + break - config_subscriber.stop() - websocket_server.manager.close_all() - websocket_server.manager.stop() - websocket_server.manager.join() - websocket_server.shutdown() - websocket_thread.join() - logger.info("exiting output process...") + ( + camera, + frame_name, + frame_time, + current_tracked_objects, + motion_boxes, + regions, + ) = data + + frame = frame_manager.get( + frame_name, self.config.cameras[camera].frame_shape_yuv + ) + frame_manager.close(frame_name) + + detection_subscriber.stop() + + for jsmpeg in jsmpeg_cameras.values(): + jsmpeg.stop() + + for preview in preview_recorders.values(): + preview.stop() + + if birdseye is not None: + birdseye.stop() + + config_subscriber.stop() + websocket_server.manager.close_all() + websocket_server.manager.stop() + websocket_server.manager.join() + websocket_server.shutdown() + websocket_thread.join() + logger.info("exiting output process...") def move_preview_frames(loc: str): diff --git a/frigate/record/record.py b/frigate/record/record.py index 252b80545..40a943a43 100644 --- a/frigate/record/record.py +++ b/frigate/record/record.py @@ -1,50 +1,40 @@ """Run recording maintainer and cleanup.""" import logging -import multiprocessing as mp -import signal -import threading -from types import FrameType -from typing import Optional from playhouse.sqliteq import SqliteQueueDatabase -from setproctitle import setproctitle from frigate.config import FrigateConfig from frigate.models import Recordings, ReviewSegment from frigate.record.maintainer import RecordingMaintainer -from frigate.util.services import listen +from frigate.util import Process as FrigateProcess logger = logging.getLogger(__name__) -def manage_recordings(config: FrigateConfig) -> None: - stop_event = mp.Event() +class RecordProcess(FrigateProcess): + def __init__(self, config: FrigateConfig) -> None: + super().__init__(name="frigate.recording_manager", daemon=True) + self.config = config - def receiveSignal(signalNumber: int, frame: Optional[FrameType]) -> None: - stop_event.set() + def run(self) -> None: + self.pre_run_setup() + db = SqliteQueueDatabase( + self.config.database.path, + pragmas={ + "auto_vacuum": "FULL", # Does not defragment database + "cache_size": -512 * 1000, # 512MB of cache + "synchronous": "NORMAL", # Safe when using WAL https://www.sqlite.org/pragma.html#pragma_synchronous + }, + timeout=max( + 60, 10 * len([c for c in self.config.cameras.values() if c.enabled]) + ), + ) + models = [ReviewSegment, Recordings] + db.bind(models) - signal.signal(signal.SIGTERM, receiveSignal) - signal.signal(signal.SIGINT, receiveSignal) - - threading.current_thread().name = "process:recording_manager" - setproctitle("frigate.recording_manager") - listen() - - db = SqliteQueueDatabase( - config.database.path, - pragmas={ - "auto_vacuum": "FULL", # Does not defragment database - "cache_size": -512 * 1000, # 512MB of cache - "synchronous": "NORMAL", # Safe when using WAL https://www.sqlite.org/pragma.html#pragma_synchronous - }, - timeout=max(60, 10 * len([c for c in config.cameras.values() if c.enabled])), - ) - models = [ReviewSegment, Recordings] - db.bind(models) - - maintainer = RecordingMaintainer( - config, - stop_event, - ) - maintainer.start() + maintainer = RecordingMaintainer( + self.config, + self.stop_event, + ) + maintainer.start() diff --git a/frigate/review/review.py b/frigate/review/review.py index dafa6c802..00910e439 100644 --- a/frigate/review/review.py +++ b/frigate/review/review.py @@ -1,36 +1,23 @@ """Run recording maintainer and cleanup.""" import logging -import multiprocessing as mp -import signal -import threading -from types import FrameType -from typing import Optional - -from setproctitle import setproctitle +import frigate.util as util from frigate.config import FrigateConfig from frigate.review.maintainer import ReviewSegmentMaintainer -from frigate.util.services import listen logger = logging.getLogger(__name__) -def manage_review_segments(config: FrigateConfig) -> None: - stop_event = mp.Event() +class ReviewProcess(util.Process): + def __init__(self, config: FrigateConfig) -> None: + super().__init__(name="frigate.review_segment_manager", daemon=True) + self.config = config - def receiveSignal(signalNumber: int, frame: Optional[FrameType]) -> None: - stop_event.set() - - signal.signal(signal.SIGTERM, receiveSignal) - signal.signal(signal.SIGINT, receiveSignal) - - threading.current_thread().name = "process:review_segment_manager" - setproctitle("frigate.review_segment_manager") - listen() - - maintainer = ReviewSegmentMaintainer( - config, - stop_event, - ) - maintainer.start() + def run(self) -> None: + self.pre_run_setup() + maintainer = ReviewSegmentMaintainer( + self.config, + self.stop_event, + ) + maintainer.start() diff --git a/frigate/stats/util.py b/frigate/stats/util.py index 5078269eb..dbba0c191 100644 --- a/frigate/stats/util.py +++ b/frigate/stats/util.py @@ -5,13 +5,13 @@ import os import shutil import time from json import JSONDecodeError +from multiprocessing.managers import DictProxy from typing import Any, Optional import psutil import requests from requests.exceptions import RequestException -from frigate.camera import CameraMetrics from frigate.config import FrigateConfig from frigate.const import CACHE_DIR, CLIPS_DIR, RECORD_DIR from frigate.data_processing.types import DataProcessorMetrics @@ -53,7 +53,7 @@ def get_latest_version(config: FrigateConfig) -> str: def stats_init( config: FrigateConfig, - camera_metrics: dict[str, CameraMetrics], + camera_metrics: DictProxy, embeddings_metrics: DataProcessorMetrics | None, detectors: dict[str, ObjectDetectProcess], processes: dict[str, int], @@ -271,10 +271,12 @@ def stats_snapshot( stats["cameras"] = {} for name, camera_stats in camera_metrics.items(): total_detection_fps += camera_stats.detection_fps.value - pid = camera_stats.process.pid if camera_stats.process else None + pid = camera_stats.process_pid.value if camera_stats.process_pid.value else None ffmpeg_pid = camera_stats.ffmpeg_pid.value if camera_stats.ffmpeg_pid else None capture_pid = ( - camera_stats.capture_process.pid if camera_stats.capture_process else None + camera_stats.capture_process_pid.value + if camera_stats.capture_process_pid.value + else None ) stats["cameras"][name] = { "camera_fps": round(camera_stats.camera_fps.value, 2), diff --git a/frigate/util/builtin.py b/frigate/util/builtin.py index 0433af18e..90c0f9227 100644 --- a/frigate/util/builtin.py +++ b/frigate/util/builtin.py @@ -341,11 +341,14 @@ def clear_and_unlink(file: Path, missing_ok: bool = True) -> None: def empty_and_close_queue(q: mp.Queue): while True: try: - q.get(block=True, timeout=0.5) - except queue.Empty: - q.close() - q.join_thread() - return + try: + q.get(block=True, timeout=0.5) + except (queue.Empty, EOFError): + q.close() + q.join_thread() + return + except AttributeError: + pass def generate_color_palette(n): diff --git a/frigate/util/process.py b/frigate/util/process.py index ac15539fe..3501e585e 100644 --- a/frigate/util/process.py +++ b/frigate/util/process.py @@ -4,9 +4,8 @@ import multiprocessing as mp import signal import sys import threading -from functools import wraps from logging.handlers import QueueHandler -from typing import Any, Callable, Optional +from typing import Callable, Optional import frigate.log @@ -30,34 +29,12 @@ class BaseProcess(mp.Process): super().start(*args, **kwargs) self.after_start() - def __getattribute__(self, name: str) -> Any: - if name == "run": - run = super().__getattribute__("run") - - @wraps(run) - def run_wrapper(*args, **kwargs): - try: - self.before_run() - return run(*args, **kwargs) - finally: - self.after_run() - - return run_wrapper - - return super().__getattribute__(name) - def before_start(self) -> None: pass def after_start(self) -> None: pass - def before_run(self) -> None: - pass - - def after_run(self) -> None: - pass - class Process(BaseProcess): logger: logging.Logger @@ -73,7 +50,7 @@ class Process(BaseProcess): def before_start(self) -> None: self.__log_queue = frigate.log.log_listener.queue - def before_run(self) -> None: + def pre_run_setup(self) -> None: faulthandler.enable() def receiveSignal(signalNumber, frame): @@ -88,8 +65,6 @@ class Process(BaseProcess): signal.signal(signal.SIGTERM, receiveSignal) signal.signal(signal.SIGINT, receiveSignal) - self.logger = logging.getLogger(self.name) - logging.basicConfig(handlers=[], force=True) logging.getLogger().addHandler(QueueHandler(self.__log_queue)) diff --git a/frigate/video.py b/frigate/video.py index 03377d01a..80deae707 100755 --- a/frigate/video.py +++ b/frigate/video.py @@ -1,9 +1,7 @@ import datetime import logging -import multiprocessing as mp import os import queue -import signal import subprocess as sp import threading import time @@ -12,8 +10,8 @@ from multiprocessing.synchronize import Event as MpEvent from typing import Any import cv2 -from setproctitle import setproctitle +import frigate.util as util from frigate.camera import CameraMetrics, PTZMetrics from frigate.comms.inter_process import InterProcessRequestor from frigate.config import CameraConfig, DetectConfig, ModelConfig @@ -53,7 +51,6 @@ from frigate.util.object import ( is_object_filtered, reduce_detections, ) -from frigate.util.services import listen logger = logging.getLogger(__name__) @@ -318,7 +315,7 @@ class CameraWatchdog(threading.Thread): ffmpeg_cmd, self.logger, self.logpipe, self.frame_size ) self.ffmpeg_pid.value = self.ffmpeg_detect_process.pid - self.capture_thread = CameraCapture( + self.capture_thread = CameraCaptureRunner( self.config, self.shm_frame_count, self.frame_index, @@ -396,7 +393,7 @@ class CameraWatchdog(threading.Thread): return newest_segment_time -class CameraCapture(threading.Thread): +class CameraCaptureRunner(threading.Thread): def __init__( self, config: CameraConfig, @@ -440,103 +437,103 @@ class CameraCapture(threading.Thread): ) -def capture_camera( - config: CameraConfig, shm_frame_count: int, camera_metrics: CameraMetrics -): - stop_event = mp.Event() +class CameraCapture(util.Process): + def __init__( + self, config: CameraConfig, shm_frame_count: int, camera_metrics: CameraMetrics + ) -> None: + super().__init__(name=f"camera_capture:{config.name}", daemon=True) + self.config = config + self.shm_frame_count = shm_frame_count + self.camera_metrics = camera_metrics - def receiveSignal(signalNumber, frame): - stop_event.set() - - signal.signal(signal.SIGTERM, receiveSignal) - signal.signal(signal.SIGINT, receiveSignal) - - threading.current_thread().name = f"capture:{config.name}" - setproctitle(f"frigate.capture:{config.name}") - - camera_watchdog = CameraWatchdog( - config, - shm_frame_count, - camera_metrics.frame_queue, - camera_metrics.camera_fps, - camera_metrics.skipped_fps, - camera_metrics.ffmpeg_pid, - stop_event, - ) - camera_watchdog.start() - camera_watchdog.join() + def run(self) -> None: + self.pre_run_setup() + camera_watchdog = CameraWatchdog( + self.config, + self.shm_frame_count, + self.camera_metrics.frame_queue, + self.camera_metrics.camera_fps, + self.camera_metrics.skipped_fps, + self.camera_metrics.ffmpeg_pid, + self.stop_event, + ) + camera_watchdog.start() + camera_watchdog.join() -def track_camera( - name, - config: CameraConfig, - model_config: ModelConfig, - labelmap: dict[int, str], - detection_queue: Queue, - detected_objects_queue, - camera_metrics: CameraMetrics, - ptz_metrics: PTZMetrics, - region_grid: list[list[dict[str, Any]]], -): - stop_event = mp.Event() - - def receiveSignal(signalNumber, frame): - stop_event.set() - - signal.signal(signal.SIGTERM, receiveSignal) - signal.signal(signal.SIGINT, receiveSignal) - - threading.current_thread().name = f"process:{name}" - setproctitle(f"frigate.process:{name}") - listen() - - frame_queue = camera_metrics.frame_queue - - frame_shape = config.frame_shape - - motion_detector = ImprovedMotionDetector( - frame_shape, - config.motion, - config.detect.fps, - name=config.name, - ptz_metrics=ptz_metrics, - ) - object_detector = RemoteObjectDetector( - name, labelmap, detection_queue, model_config, stop_event - ) - - object_tracker = NorfairTracker(config, ptz_metrics) - - frame_manager = SharedMemoryFrameManager() - - # create communication for region grid updates - requestor = InterProcessRequestor() - - process_frames( - name, - requestor, - frame_queue, - frame_shape, - model_config, - config, - frame_manager, - motion_detector, - object_detector, - object_tracker, +class CameraTracker(util.Process): + def __init__( + self, + config: CameraConfig, + model_config: ModelConfig, + labelmap: dict[int, str], + detection_queue: Queue, detected_objects_queue, - camera_metrics, - stop_event, - ptz_metrics, - region_grid, - ) + camera_metrics: CameraMetrics, + ptz_metrics: PTZMetrics, + region_grid: list[list[dict[str, Any]]], + ) -> None: + super().__init__(name=f"camera_processor:{config.name}", daemon=True) + self.config = config + self.model_config = model_config + self.labelmap = labelmap + self.detection_queue = detection_queue + self.detected_objects_queue = detected_objects_queue + self.camera_metrics = camera_metrics + self.ptz_metrics = ptz_metrics + self.region_grid = region_grid - # empty the frame queue - logger.info(f"{name}: emptying frame queue") - while not frame_queue.empty(): - (frame_name, _) = frame_queue.get(False) - frame_manager.delete(frame_name) + def run(self) -> None: + self.pre_run_setup() + frame_queue = self.camera_metrics.frame_queue + frame_shape = self.config.frame_shape - logger.info(f"{name}: exiting subprocess") + motion_detector = ImprovedMotionDetector( + frame_shape, + self.config.motion, + self.config.detect.fps, + name=self.config.name, + ptz_metrics=self.ptz_metrics, + ) + object_detector = RemoteObjectDetector( + self.config.name, + self.labelmap, + self.detection_queue, + self.model_config, + self.stop_event, + ) + + object_tracker = NorfairTracker(self.config, self.ptz_metrics) + + frame_manager = SharedMemoryFrameManager() + + # create communication for region grid updates + requestor = InterProcessRequestor() + + process_frames( + requestor, + frame_queue, + frame_shape, + self.model_config, + self.config, + frame_manager, + motion_detector, + object_detector, + object_tracker, + self.detected_objects_queue, + self.camera_metrics, + self.stop_event, + self.ptz_metrics, + self.region_grid, + ) + + # empty the frame queue + logger.info(f"{self.config.name}: emptying frame queue") + while not frame_queue.empty(): + (frame_name, _) = frame_queue.get(False) + frame_manager.delete(frame_name) + + logger.info(f"{self.config.name}: exiting subprocess") def detect( @@ -577,7 +574,6 @@ def detect( def process_frames( - camera_name: str, requestor: InterProcessRequestor, frame_queue: Queue, frame_shape: tuple[int, int], @@ -597,7 +593,7 @@ def process_frames( next_region_update = get_tomorrow_at_time(2) config_subscriber = CameraConfigUpdateSubscriber( None, - {camera_name: camera_config}, + {camera_config.name: camera_config}, [ CameraConfigUpdateEnum.detect, CameraConfigUpdateEnum.enabled, @@ -653,7 +649,9 @@ def process_frames( and prev_enabled != camera_enabled and camera_metrics.frame_queue.empty() ): - logger.debug(f"Camera {camera_name} disabled, clearing tracked objects") + logger.debug( + f"Camera {camera_config.name} disabled, clearing tracked objects" + ) prev_enabled = camera_enabled # Clear norfair's dictionaries @@ -678,7 +676,7 @@ def process_frames( datetime.datetime.now().astimezone(datetime.timezone.utc) > next_region_update ): - region_grid = requestor.send_data(REQUEST_REGION_GRID, camera_name) + region_grid = requestor.send_data(REQUEST_REGION_GRID, camera_config.name) next_region_update = get_tomorrow_at_time(2) try: @@ -698,7 +696,9 @@ def process_frames( frame = frame_manager.get(frame_name, (frame_shape[0] * 3 // 2, frame_shape[1])) if frame is None: - logger.debug(f"{camera_name}: frame {frame_time} is not in memory store.") + logger.debug( + f"{camera_config.name}: frame {frame_time} is not in memory store." + ) continue # look for motion if enabled @@ -937,7 +937,7 @@ def process_frames( ) cv2.imwrite( - f"debug/frames/{camera_name}-{'{:.6f}'.format(frame_time)}.jpg", + f"debug/frames/{camera_config.name}-{'{:.6f}'.format(frame_time)}.jpg", bgr_frame, ) # add to the queue if not full @@ -949,7 +949,7 @@ def process_frames( camera_metrics.process_fps.value = fps_tracker.eps() detected_objects_queue.put( ( - camera_name, + camera_config.name, frame_name, frame_time, detections, diff --git a/web/src/views/system/CameraMetrics.tsx b/web/src/views/system/CameraMetrics.tsx index ba2701926..3f5891265 100644 --- a/web/src/views/system/CameraMetrics.tsx +++ b/web/src/views/system/CameraMetrics.tsx @@ -173,7 +173,7 @@ export default function CameraMetrics({ }); series[key]["detect"].data.push({ x: statsIdx, - y: stats.cpu_usages[camStats.pid.toString()].cpu, + y: stats.cpu_usages[camStats.pid?.toString()]?.cpu, }); }); });