Use Fork-Server As Spawn Method (#18682)

* Set runtime

* Use count correctly

* Don't assume camera sizes

* Use separate zmq proxy for object detection

* Correct order

* Use forkserver

* Only store PID instead of entire process reference

* Cleanup

* Catch correct errors

* Fix typing

* Remove before_run from process util

The before_run never actually ran because:

You're right to suspect an issue with before_run not being called and a potential deadlock. The way you've implemented the run_wrapper using __getattribute__ for the run method of BaseProcess is a common pitfall in Python's multiprocessing, especially when combined with how multiprocessing.Process works internally.

Here's a breakdown of why before_run isn't being called and why you might be experiencing a deadlock:

The Problem: __getattribute__ and Process Serialization
When you create a multiprocessing.Process object and call start(), the multiprocessing module needs to serialize the process object (or at least enough of it to re-create the process in the new interpreter). It then pickles this serialized object and sends it to the newly spawned process.

The issue with your __getattribute__ implementation for run is that:

run is retrieved during serialization: When multiprocessing tries to pickle your Process object to send to the new process, it will likely access the run attribute. This triggers your __getattribute__ wrapper, which then tries to bind run_wrapper to self.
run_wrapper is bound to the parent process's self: The run_wrapper closure, when created in the parent process, captures the self (the Process instance) from the parent's memory space.
Deserialization creates a new object: In the child process, a new Process object is created by deserializing the pickled data. However, the run_wrapper method that was pickled still holds a reference to the self from the parent process. This is a subtle but critical distinction.
The child's run is not your wrapped run: When the child process starts, it internally calls its own run method. Because of the serialization and deserialization process, the run method that's ultimately executed in the child process is the original multiprocessing.Process.run or the Process.run if you had directly overridden it. Your __getattribute__ magic, which wraps run, isn't correctly applied to the Process object within the child's context.

* Cleanup

* Logging bugfix (#18465)

* use mp Manager to handle logging queues

A Python bug (https://github.com/python/cpython/issues/91555) was preventing logs from the embeddings maintainer process from printing. The bug is fixed in Python 3.14, but a viable workaround is to use the multiprocessing Manager, which better manages mp queues and causes the logging to work correctly.

* consolidate

* fix typing

* Fix typing

* Use global log queue

* Move to using process for logging

* Convert camera tracking to process

* Add more processes

* Finalize process

* Cleanup

* Cleanup typing

* Formatting

* Remove daemon

---------

Co-authored-by: Josh Hawkins <32435876+hawkeye217@users.noreply.github.com>
This commit is contained in:
Nicolas Mowen 2025-06-12 12:12:34 -06:00 committed by GitHub
parent 6f16ecdd48
commit 8485023442
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
19 changed files with 611 additions and 604 deletions

View File

@ -1,5 +1,6 @@
import argparse import argparse
import faulthandler import faulthandler
import multiprocessing as mp
import signal import signal
import sys import sys
import threading import threading
@ -15,10 +16,11 @@ from frigate.util.config import find_config_file
def main() -> None: def main() -> None:
manager = mp.Manager()
faulthandler.enable() faulthandler.enable()
# Setup the logging thread # Setup the logging thread
setup_logging() setup_logging(manager)
threading.current_thread().name = "frigate" threading.current_thread().name = "frigate"
@ -108,8 +110,9 @@ def main() -> None:
sys.exit(0) sys.exit(0)
# Run the main application. # Run the main application.
FrigateApp(config).start() FrigateApp(config, manager).start()
if __name__ == "__main__": if __name__ == "__main__":
mp.set_start_method("forkserver", force=True)
main() main()

View File

@ -5,6 +5,7 @@ import os
import secrets import secrets
import shutil import shutil
from multiprocessing import Queue from multiprocessing import Queue
from multiprocessing.managers import DictProxy, SyncManager
from multiprocessing.synchronize import Event as MpEvent from multiprocessing.synchronize import Event as MpEvent
from typing import Optional from typing import Optional
@ -13,7 +14,6 @@ import uvicorn
from peewee_migrate import Router from peewee_migrate import Router
from playhouse.sqlite_ext import SqliteExtDatabase from playhouse.sqlite_ext import SqliteExtDatabase
import frigate.util as util
from frigate.api.auth import hash_password from frigate.api.auth import hash_password
from frigate.api.fastapi_app import create_fastapi_app from frigate.api.fastapi_app import create_fastapi_app
from frigate.camera import CameraMetrics, PTZMetrics from frigate.camera import CameraMetrics, PTZMetrics
@ -23,6 +23,7 @@ from frigate.comms.dispatcher import Dispatcher
from frigate.comms.event_metadata_updater import EventMetadataPublisher from frigate.comms.event_metadata_updater import EventMetadataPublisher
from frigate.comms.inter_process import InterProcessCommunicator from frigate.comms.inter_process import InterProcessCommunicator
from frigate.comms.mqtt import MqttClient from frigate.comms.mqtt import MqttClient
from frigate.comms.object_detector_signaler import DetectorProxy
from frigate.comms.webpush import WebPushClient from frigate.comms.webpush import WebPushClient
from frigate.comms.ws import WebSocketClient from frigate.comms.ws import WebSocketClient
from frigate.comms.zmq_proxy import ZmqProxy from frigate.comms.zmq_proxy import ZmqProxy
@ -40,10 +41,11 @@ from frigate.const import (
) )
from frigate.data_processing.types import DataProcessorMetrics from frigate.data_processing.types import DataProcessorMetrics
from frigate.db.sqlitevecq import SqliteVecQueueDatabase from frigate.db.sqlitevecq import SqliteVecQueueDatabase
from frigate.embeddings import EmbeddingsContext, manage_embeddings from frigate.embeddings import EmbeddingProcess, EmbeddingsContext
from frigate.events.audio import AudioProcessor from frigate.events.audio import AudioProcessor
from frigate.events.cleanup import EventCleanup from frigate.events.cleanup import EventCleanup
from frigate.events.maintainer import EventProcessor from frigate.events.maintainer import EventProcessor
from frigate.log import _stop_logging
from frigate.models import ( from frigate.models import (
Event, Event,
Export, Export,
@ -56,13 +58,13 @@ from frigate.models import (
User, User,
) )
from frigate.object_detection.base import ObjectDetectProcess from frigate.object_detection.base import ObjectDetectProcess
from frigate.output.output import output_frames from frigate.output.output import OutputProcess
from frigate.ptz.autotrack import PtzAutoTrackerThread from frigate.ptz.autotrack import PtzAutoTrackerThread
from frigate.ptz.onvif import OnvifController from frigate.ptz.onvif import OnvifController
from frigate.record.cleanup import RecordingCleanup from frigate.record.cleanup import RecordingCleanup
from frigate.record.export import migrate_exports from frigate.record.export import migrate_exports
from frigate.record.record import manage_recordings from frigate.record.record import RecordProcess
from frigate.review.review import manage_review_segments from frigate.review.review import ReviewProcess
from frigate.stats.emitter import StatsEmitter from frigate.stats.emitter import StatsEmitter
from frigate.stats.util import stats_init from frigate.stats.util import stats_init
from frigate.storage import StorageMaintainer from frigate.storage import StorageMaintainer
@ -77,16 +79,19 @@ logger = logging.getLogger(__name__)
class FrigateApp: class FrigateApp:
def __init__(self, config: FrigateConfig) -> None: def __init__(self, config: FrigateConfig, manager: SyncManager) -> None:
self.metrics_manager = manager
self.audio_process: Optional[mp.Process] = None self.audio_process: Optional[mp.Process] = None
self.stop_event: MpEvent = mp.Event() self.stop_event: MpEvent = mp.Event()
self.detection_queue: Queue = mp.Queue() self.detection_queue: Queue = mp.Queue()
self.detectors: dict[str, ObjectDetectProcess] = {} self.detectors: dict[str, ObjectDetectProcess] = {}
self.detection_shms: list[mp.shared_memory.SharedMemory] = [] self.detection_shms: list[mp.shared_memory.SharedMemory] = []
self.log_queue: Queue = mp.Queue() self.log_queue: Queue = mp.Queue()
self.camera_metrics: dict[str, CameraMetrics] = {} self.camera_metrics: DictProxy = self.metrics_manager.dict()
self.embeddings_metrics: DataProcessorMetrics | None = ( self.embeddings_metrics: DataProcessorMetrics | None = (
DataProcessorMetrics(list(config.classification.custom.keys())) DataProcessorMetrics(
self.metrics_manager, list(config.classification.custom.keys())
)
if ( if (
config.semantic_search.enabled config.semantic_search.enabled
or config.lpr.enabled or config.lpr.enabled
@ -124,7 +129,7 @@ class FrigateApp:
def init_camera_metrics(self) -> None: def init_camera_metrics(self) -> None:
# create camera_metrics # create camera_metrics
for camera_name in self.config.cameras.keys(): for camera_name in self.config.cameras.keys():
self.camera_metrics[camera_name] = CameraMetrics() self.camera_metrics[camera_name] = CameraMetrics(self.metrics_manager)
self.ptz_metrics[camera_name] = PTZMetrics( self.ptz_metrics[camera_name] = PTZMetrics(
autotracker_enabled=self.config.cameras[ autotracker_enabled=self.config.cameras[
camera_name camera_name
@ -218,24 +223,14 @@ class FrigateApp:
self.processes["go2rtc"] = proc.info["pid"] self.processes["go2rtc"] = proc.info["pid"]
def init_recording_manager(self) -> None: def init_recording_manager(self) -> None:
recording_process = util.Process( recording_process = RecordProcess(self.config)
target=manage_recordings,
name="recording_manager",
args=(self.config,),
)
recording_process.daemon = True
self.recording_process = recording_process self.recording_process = recording_process
recording_process.start() recording_process.start()
self.processes["recording"] = recording_process.pid or 0 self.processes["recording"] = recording_process.pid or 0
logger.info(f"Recording process started: {recording_process.pid}") logger.info(f"Recording process started: {recording_process.pid}")
def init_review_segment_manager(self) -> None: def init_review_segment_manager(self) -> None:
review_segment_process = util.Process( review_segment_process = ReviewProcess(self.config)
target=manage_review_segments,
name="review_segment_manager",
args=(self.config,),
)
review_segment_process.daemon = True
self.review_segment_process = review_segment_process self.review_segment_process = review_segment_process
review_segment_process.start() review_segment_process.start()
self.processes["review_segment"] = review_segment_process.pid or 0 self.processes["review_segment"] = review_segment_process.pid or 0
@ -254,15 +249,10 @@ class FrigateApp:
): ):
return return
embedding_process = util.Process( embedding_process = EmbeddingProcess(
target=manage_embeddings, self.config,
name="embeddings_manager", self.embeddings_metrics,
args=(
self.config,
self.embeddings_metrics,
),
) )
embedding_process.daemon = True
self.embedding_process = embedding_process self.embedding_process = embedding_process
embedding_process.start() embedding_process.start()
self.processes["embeddings"] = embedding_process.pid or 0 self.processes["embeddings"] = embedding_process.pid or 0
@ -330,6 +320,7 @@ class FrigateApp:
self.inter_config_updater = CameraConfigUpdatePublisher() self.inter_config_updater = CameraConfigUpdatePublisher()
self.event_metadata_updater = EventMetadataPublisher() self.event_metadata_updater = EventMetadataPublisher()
self.inter_zmq_proxy = ZmqProxy() self.inter_zmq_proxy = ZmqProxy()
self.detection_proxy = DetectorProxy()
def init_onvif(self) -> None: def init_onvif(self) -> None:
self.onvif_controller = OnvifController(self.config, self.ptz_metrics) self.onvif_controller = OnvifController(self.config, self.ptz_metrics)
@ -418,12 +409,7 @@ class FrigateApp:
self.detected_frames_processor.start() self.detected_frames_processor.start()
def start_video_output_processor(self) -> None: def start_video_output_processor(self) -> None:
output_processor = util.Process( output_processor = OutputProcess(self.config)
target=output_frames,
name="output_processor",
args=(self.config,),
)
output_processor.daemon = True
self.output_processor = output_processor self.output_processor = output_processor
output_processor.start() output_processor.start()
logger.info(f"Output process started: {output_processor.pid}") logger.info(f"Output process started: {output_processor.pid}")
@ -554,11 +540,11 @@ class FrigateApp:
self.init_recording_manager() self.init_recording_manager()
self.init_review_segment_manager() self.init_review_segment_manager()
self.init_go2rtc() self.init_go2rtc()
self.start_detectors()
self.init_embeddings_manager() self.init_embeddings_manager()
self.bind_database() self.bind_database()
self.check_db_data_migrations() self.check_db_data_migrations()
self.init_inter_process_communicator() self.init_inter_process_communicator()
self.start_detectors()
self.init_dispatcher() self.init_dispatcher()
self.init_embeddings_client() self.init_embeddings_client()
self.start_video_output_processor() self.start_video_output_processor()
@ -661,10 +647,13 @@ class FrigateApp:
self.inter_config_updater.stop() self.inter_config_updater.stop()
self.event_metadata_updater.stop() self.event_metadata_updater.stop()
self.inter_zmq_proxy.stop() self.inter_zmq_proxy.stop()
self.detection_proxy.stop()
while len(self.detection_shms) > 0: while len(self.detection_shms) > 0:
shm = self.detection_shms.pop() shm = self.detection_shms.pop()
shm.close() shm.close()
shm.unlink() shm.unlink()
_stop_logging()
self.metrics_manager.shutdown()
os._exit(os.EX_OK) os._exit(os.EX_OK)

View File

@ -1,7 +1,7 @@
import multiprocessing as mp import multiprocessing as mp
from multiprocessing.managers import SyncManager
from multiprocessing.sharedctypes import Synchronized from multiprocessing.sharedctypes import Synchronized
from multiprocessing.synchronize import Event from multiprocessing.synchronize import Event
from typing import Optional
class CameraMetrics: class CameraMetrics:
@ -16,25 +16,25 @@ class CameraMetrics:
frame_queue: mp.Queue frame_queue: mp.Queue
process: Optional[mp.Process] process_pid: Synchronized
capture_process: Optional[mp.Process] capture_process_pid: Synchronized
ffmpeg_pid: Synchronized ffmpeg_pid: Synchronized
def __init__(self): def __init__(self, manager: SyncManager):
self.camera_fps = mp.Value("d", 0) self.camera_fps = manager.Value("d", 0)
self.detection_fps = mp.Value("d", 0) self.detection_fps = manager.Value("d", 0)
self.detection_frame = mp.Value("d", 0) self.detection_frame = manager.Value("d", 0)
self.process_fps = mp.Value("d", 0) self.process_fps = manager.Value("d", 0)
self.skipped_fps = mp.Value("d", 0) self.skipped_fps = manager.Value("d", 0)
self.read_start = mp.Value("d", 0) self.read_start = manager.Value("d", 0)
self.audio_rms = mp.Value("d", 0) self.audio_rms = manager.Value("d", 0)
self.audio_dBFS = mp.Value("d", 0) self.audio_dBFS = manager.Value("d", 0)
self.frame_queue = mp.Queue(maxsize=2) self.frame_queue = manager.Queue(maxsize=2)
self.process = None self.process_pid = manager.Value("i", 0)
self.capture_process = None self.capture_process_pid = manager.Value("i", 0)
self.ffmpeg_pid = mp.Value("i", 0) self.ffmpeg_pid = manager.Value("i", 0)
class PTZMetrics: class PTZMetrics:

View File

@ -1,10 +1,12 @@
"""Create and maintain camera processes / management.""" """Create and maintain camera processes / management."""
import logging import logging
import multiprocessing as mp
import os import os
import shutil import shutil
import threading import threading
from multiprocessing import Queue from multiprocessing import Queue
from multiprocessing.managers import DictProxy
from multiprocessing.synchronize import Event as MpEvent from multiprocessing.synchronize import Event as MpEvent
from frigate.camera import CameraMetrics, PTZMetrics from frigate.camera import CameraMetrics, PTZMetrics
@ -16,11 +18,10 @@ from frigate.config.camera.updater import (
) )
from frigate.const import SHM_FRAMES_VAR from frigate.const import SHM_FRAMES_VAR
from frigate.models import Regions from frigate.models import Regions
from frigate.util import Process as FrigateProcess
from frigate.util.builtin import empty_and_close_queue from frigate.util.builtin import empty_and_close_queue
from frigate.util.image import SharedMemoryFrameManager, UntrackedSharedMemory from frigate.util.image import SharedMemoryFrameManager, UntrackedSharedMemory
from frigate.util.object import get_camera_regions_grid from frigate.util.object import get_camera_regions_grid
from frigate.video import capture_camera, track_camera from frigate.video import CameraCapture, CameraTracker
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
@ -31,7 +32,7 @@ class CameraMaintainer(threading.Thread):
config: FrigateConfig, config: FrigateConfig,
detection_queue: Queue, detection_queue: Queue,
detected_frames_queue: Queue, detected_frames_queue: Queue,
camera_metrics: dict[str, CameraMetrics], camera_metrics: DictProxy,
ptz_metrics: dict[str, PTZMetrics], ptz_metrics: dict[str, PTZMetrics],
stop_event: MpEvent, stop_event: MpEvent,
): ):
@ -53,6 +54,8 @@ class CameraMaintainer(threading.Thread):
], ],
) )
self.shm_count = self.__calculate_shm_frame_count() self.shm_count = self.__calculate_shm_frame_count()
self.camera_processes: dict[str, mp.Process] = {}
self.capture_processes: dict[str, mp.Process] = {}
def __init_historical_regions(self) -> None: def __init_historical_regions(self) -> None:
# delete region grids for removed or renamed cameras # delete region grids for removed or renamed cameras
@ -94,7 +97,7 @@ class CameraMaintainer(threading.Thread):
# leave room for 2 cameras that are added dynamically, if a user wants to add more cameras they may need to increase the SHM size and restart after adding them. # leave room for 2 cameras that are added dynamically, if a user wants to add more cameras they may need to increase the SHM size and restart after adding them.
cam_total_frame_size += 2 * round( cam_total_frame_size += 2 * round(
(camera.detect.width * camera.detect.height * 1.5 + 270480) / 1048576, (1280 * 720 * 1.5 + 270480) / 1048576,
1, 1,
) )
@ -151,24 +154,19 @@ class CameraMaintainer(threading.Thread):
except FileExistsError: except FileExistsError:
pass pass
camera_process = FrigateProcess( camera_process = CameraTracker(
target=track_camera, config,
name=f"camera_processor:{name}", self.config.model,
args=( self.config.model.merged_labelmap,
config.name, self.detection_queue,
config, self.detected_frames_queue,
self.config.model, self.camera_metrics[name],
self.config.model.merged_labelmap, self.ptz_metrics[name],
self.detection_queue, self.region_grids[name],
self.detected_frames_queue,
self.camera_metrics[name],
self.ptz_metrics[name],
self.region_grids[name],
),
daemon=True,
) )
self.camera_metrics[config.name].process = camera_process self.camera_processes[config.name] = camera_process
camera_process.start() camera_process.start()
self.camera_metrics[config.name].process_pid.value = camera_process.pid
logger.info(f"Camera processor started for {config.name}: {camera_process.pid}") logger.info(f"Camera processor started for {config.name}: {camera_process.pid}")
def __start_camera_capture( def __start_camera_capture(
@ -179,36 +177,33 @@ class CameraMaintainer(threading.Thread):
return return
# pre-create shms # pre-create shms
for i in range(10 if runtime else self.shm_count): count = 10 if runtime else self.shm_count
for i in range(count):
frame_size = config.frame_shape_yuv[0] * config.frame_shape_yuv[1] frame_size = config.frame_shape_yuv[0] * config.frame_shape_yuv[1]
self.frame_manager.create(f"{config.name}_frame{i}", frame_size) self.frame_manager.create(f"{config.name}_frame{i}", frame_size)
capture_process = FrigateProcess( capture_process = CameraCapture(config, count, self.camera_metrics[name])
target=capture_camera,
name=f"camera_capture:{name}",
args=(config, self.shm_count, self.camera_metrics[name]),
)
capture_process.daemon = True capture_process.daemon = True
self.camera_metrics[name].capture_process = capture_process self.capture_processes[name] = capture_process
capture_process.start() capture_process.start()
self.camera_metrics[name].capture_process_pid.value = capture_process.pid
logger.info(f"Capture process started for {name}: {capture_process.pid}") logger.info(f"Capture process started for {name}: {capture_process.pid}")
def __stop_camera_capture_process(self, camera: str) -> None: def __stop_camera_capture_process(self, camera: str) -> None:
capture_process = self.camera_metrics[camera].capture_process capture_process = self.capture_processes[camera]
if capture_process is not None: if capture_process is not None:
logger.info(f"Waiting for capture process for {camera} to stop") logger.info(f"Waiting for capture process for {camera} to stop")
capture_process.terminate() capture_process.terminate()
capture_process.join() capture_process.join()
def __stop_camera_process(self, camera: str) -> None: def __stop_camera_process(self, camera: str) -> None:
metrics = self.camera_metrics[camera] camera_process = self.camera_processes[camera]
camera_process = metrics.process
if camera_process is not None: if camera_process is not None:
logger.info(f"Waiting for process for {camera} to stop") logger.info(f"Waiting for process for {camera} to stop")
camera_process.terminate() camera_process.terminate()
camera_process.join() camera_process.join()
logger.info(f"Closing frame queue for {camera}") logger.info(f"Closing frame queue for {camera}")
empty_and_close_queue(metrics.frame_queue) empty_and_close_queue(self.camera_metrics[camera].frame_queue)
def run(self): def run(self):
self.__init_historical_regions() self.__init_historical_regions()
@ -230,18 +225,20 @@ class CameraMaintainer(threading.Thread):
runtime=True, runtime=True,
) )
self.__start_camera_capture( self.__start_camera_capture(
camera, self.update_subscriber.camera_configs[camera] camera,
self.update_subscriber.camera_configs[camera],
runtime=True,
) )
elif update_type == CameraConfigUpdateEnum.remove.name: elif update_type == CameraConfigUpdateEnum.remove.name:
self.__stop_camera_capture_process(camera) self.__stop_camera_capture_process(camera)
self.__stop_camera_process(camera) self.__stop_camera_process(camera)
# ensure the capture processes are done # ensure the capture processes are done
for camera in self.camera_metrics.keys(): for camera in self.camera_processes.keys():
self.__stop_camera_capture_process(camera) self.__stop_camera_capture_process(camera)
# ensure the camera processors are done # ensure the camera processors are done
for camera in self.camera_metrics.keys(): for camera in self.capture_processes.keys():
self.__stop_camera_process(camera) self.__stop_camera_process(camera)
self.update_subscriber.stop() self.update_subscriber.stop()

View File

@ -1,21 +1,92 @@
"""Facilitates communication between processes for object detection signals.""" """Facilitates communication between processes for object detection signals."""
from .zmq_proxy import Publisher, Subscriber import threading
import zmq
SOCKET_PUB = "ipc:///tmp/cache/detector_pub"
SOCKET_SUB = "ipc:///tmp/cache/detector_sub"
class ObjectDetectorPublisher(Publisher): class ZmqProxyRunner(threading.Thread):
def __init__(self, context: zmq.Context[zmq.Socket]) -> None:
super().__init__(name="detector_proxy")
self.context = context
def run(self) -> None:
"""Run the proxy."""
incoming = self.context.socket(zmq.XSUB)
incoming.bind(SOCKET_PUB)
outgoing = self.context.socket(zmq.XPUB)
outgoing.bind(SOCKET_SUB)
# Blocking: This will unblock (via exception) when we destroy the context
# The incoming and outgoing sockets will be closed automatically
# when the context is destroyed as well.
try:
zmq.proxy(incoming, outgoing)
except zmq.ZMQError:
pass
class DetectorProxy:
"""Proxies object detection signals."""
def __init__(self) -> None:
self.context = zmq.Context()
self.runner = ZmqProxyRunner(self.context)
self.runner.start()
def stop(self) -> None:
# destroying the context will tell the proxy to stop
self.context.destroy()
self.runner.join()
class ObjectDetectorPublisher:
"""Publishes signal for object detection to different processes.""" """Publishes signal for object detection to different processes."""
topic_base = "object_detector/" topic_base = "object_detector/"
def __init__(self, topic: str = "") -> None:
self.topic = f"{self.topic_base}{topic}"
self.context = zmq.Context()
self.socket = self.context.socket(zmq.PUB)
self.socket.connect(SOCKET_PUB)
class ObjectDetectorSubscriber(Subscriber): def publish(self, sub_topic: str = "") -> None:
"""Publish message."""
self.socket.send_string(f"{self.topic}{sub_topic}/")
def stop(self) -> None:
self.socket.close()
self.context.destroy()
class ObjectDetectorSubscriber:
"""Simplifies receiving a signal for object detection.""" """Simplifies receiving a signal for object detection."""
topic_base = "object_detector/" topic_base = "object_detector/"
def __init__(self, topic: str) -> None: def __init__(self, topic: str = "") -> None:
super().__init__(topic) self.topic = f"{self.topic_base}{topic}/"
self.context = zmq.Context()
self.socket = self.context.socket(zmq.SUB)
self.socket.setsockopt_string(zmq.SUBSCRIBE, self.topic)
self.socket.connect(SOCKET_SUB)
def check_for_update(self): def check_for_update(self, timeout: float = 5) -> str | None:
return super().check_for_update(timeout=5) """Returns message or None if no update."""
try:
has_update, _, _ = zmq.select([self.socket], [], [], timeout)
if has_update:
return self.socket.recv_string(flags=zmq.NOBLOCK)
except zmq.ZMQError:
pass
return None
def stop(self) -> None:
self.socket.close()
self.context.destroy()

View File

@ -1,7 +1,7 @@
"""Embeddings types.""" """Embeddings types."""
import multiprocessing as mp
from enum import Enum from enum import Enum
from multiprocessing.managers import SyncManager
from multiprocessing.sharedctypes import Synchronized from multiprocessing.sharedctypes import Synchronized
import sherpa_onnx import sherpa_onnx
@ -20,25 +20,27 @@ class DataProcessorMetrics:
alpr_pps: Synchronized alpr_pps: Synchronized
yolov9_lpr_speed: Synchronized yolov9_lpr_speed: Synchronized
yolov9_lpr_pps: Synchronized yolov9_lpr_pps: Synchronized
classification_speeds: dict[str, Synchronized] = {} classification_speeds: dict[str, Synchronized]
classification_cps: dict[str, Synchronized] = {} classification_cps: dict[str, Synchronized]
def __init__(self, custom_classification_models: list[str]): def __init__(self, manager: SyncManager, custom_classification_models: list[str]):
self.image_embeddings_speed = mp.Value("d", 0.0) self.image_embeddings_speed = manager.Value("d", 0.0)
self.image_embeddings_eps = mp.Value("d", 0.0) self.image_embeddings_eps = manager.Value("d", 0.0)
self.text_embeddings_speed = mp.Value("d", 0.0) self.text_embeddings_speed = manager.Value("d", 0.0)
self.text_embeddings_eps = mp.Value("d", 0.0) self.text_embeddings_eps = manager.Value("d", 0.0)
self.face_rec_speed = mp.Value("d", 0.0) self.face_rec_speed = manager.Value("d", 0.0)
self.face_rec_fps = mp.Value("d", 0.0) self.face_rec_fps = manager.Value("d", 0.0)
self.alpr_speed = mp.Value("d", 0.0) self.alpr_speed = manager.Value("d", 0.0)
self.alpr_pps = mp.Value("d", 0.0) self.alpr_pps = manager.Value("d", 0.0)
self.yolov9_lpr_speed = mp.Value("d", 0.0) self.yolov9_lpr_speed = manager.Value("d", 0.0)
self.yolov9_lpr_pps = mp.Value("d", 0.0) self.yolov9_lpr_pps = manager.Value("d", 0.0)
self.classification_speeds = manager.dict()
self.classification_cps = manager.dict()
if custom_classification_models: if custom_classification_models:
for key in custom_classification_models: for key in custom_classification_models:
self.classification_speeds[key] = mp.Value("d", 0.0) self.classification_speeds[key] = manager.Value("d", 0.0)
self.classification_cps[key] = mp.Value("d", 0.0) self.classification_cps[key] = manager.Value("d", 0.0)
class DataProcessorModelRunner: class DataProcessorModelRunner:

View File

@ -3,26 +3,22 @@
import base64 import base64
import json import json
import logging import logging
import multiprocessing as mp
import os import os
import signal
import threading import threading
from types import FrameType from typing import Any, Union
from typing import Any, Optional, Union
import regex import regex
from pathvalidate import ValidationError, sanitize_filename from pathvalidate import ValidationError, sanitize_filename
from setproctitle import setproctitle
from frigate.comms.embeddings_updater import EmbeddingsRequestEnum, EmbeddingsRequestor from frigate.comms.embeddings_updater import EmbeddingsRequestEnum, EmbeddingsRequestor
from frigate.config import FrigateConfig from frigate.config import FrigateConfig
from frigate.const import CONFIG_DIR, FACE_DIR from frigate.const import CONFIG_DIR, FACE_DIR
from frigate.data_processing.types import DataProcessorMetrics from frigate.data_processing.types import DataProcessorMetrics
from frigate.db.sqlitevecq import SqliteVecQueueDatabase from frigate.db.sqlitevecq import SqliteVecQueueDatabase
from frigate.models import Event, Recordings from frigate.models import Event
from frigate.util import Process as FrigateProcess
from frigate.util.builtin import serialize from frigate.util.builtin import serialize
from frigate.util.classification import kickoff_model_training from frigate.util.classification import kickoff_model_training
from frigate.util.services import listen
from .maintainer import EmbeddingMaintainer from .maintainer import EmbeddingMaintainer
from .util import ZScoreNormalization from .util import ZScoreNormalization
@ -30,40 +26,22 @@ from .util import ZScoreNormalization
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
def manage_embeddings(config: FrigateConfig, metrics: DataProcessorMetrics) -> None: class EmbeddingProcess(FrigateProcess):
stop_event = mp.Event() def __init__(
self, config: FrigateConfig, metrics: DataProcessorMetrics | None
) -> None:
super().__init__(name="frigate.embeddings_manager", daemon=True)
self.config = config
self.metrics = metrics
def receiveSignal(signalNumber: int, frame: Optional[FrameType]) -> None: def run(self) -> None:
stop_event.set() self.pre_run_setup()
maintainer = EmbeddingMaintainer(
signal.signal(signal.SIGTERM, receiveSignal) self.config,
signal.signal(signal.SIGINT, receiveSignal) self.metrics,
self.stop_event,
threading.current_thread().name = "process:embeddings_manager" )
setproctitle("frigate.embeddings_manager") maintainer.start()
listen()
# Configure Frigate DB
db = SqliteVecQueueDatabase(
config.database.path,
pragmas={
"auto_vacuum": "FULL", # Does not defragment database
"cache_size": -512 * 1000, # 512MB of cache
"synchronous": "NORMAL", # Safe when using WAL https://www.sqlite.org/pragma.html#pragma_synchronous
},
timeout=max(60, 10 * len([c for c in config.cameras.values() if c.enabled])),
load_vec_extension=True,
)
models = [Event, Recordings]
db.bind(models)
maintainer = EmbeddingMaintainer(
db,
config,
metrics,
stop_event,
)
maintainer.start()
class EmbeddingsContext: class EmbeddingsContext:

View File

@ -12,7 +12,6 @@ from typing import Any, Optional
import cv2 import cv2
import numpy as np import numpy as np
from peewee import DoesNotExist from peewee import DoesNotExist
from playhouse.sqliteq import SqliteQueueDatabase
from frigate.comms.detections_updater import DetectionSubscriber, DetectionTypeEnum from frigate.comms.detections_updater import DetectionSubscriber, DetectionTypeEnum
from frigate.comms.embeddings_updater import EmbeddingsRequestEnum, EmbeddingsResponder from frigate.comms.embeddings_updater import EmbeddingsRequestEnum, EmbeddingsResponder
@ -58,9 +57,10 @@ from frigate.data_processing.real_time.license_plate import (
LicensePlateRealTimeProcessor, LicensePlateRealTimeProcessor,
) )
from frigate.data_processing.types import DataProcessorMetrics, PostProcessDataEnum from frigate.data_processing.types import DataProcessorMetrics, PostProcessDataEnum
from frigate.db.sqlitevecq import SqliteVecQueueDatabase
from frigate.events.types import EventTypeEnum, RegenerateDescriptionEnum from frigate.events.types import EventTypeEnum, RegenerateDescriptionEnum
from frigate.genai import get_genai_client from frigate.genai import get_genai_client
from frigate.models import Event from frigate.models import Event, Recordings
from frigate.types import TrackedObjectUpdateTypesEnum from frigate.types import TrackedObjectUpdateTypesEnum
from frigate.util.builtin import serialize from frigate.util.builtin import serialize
from frigate.util.image import ( from frigate.util.image import (
@ -82,9 +82,8 @@ class EmbeddingMaintainer(threading.Thread):
def __init__( def __init__(
self, self,
db: SqliteQueueDatabase,
config: FrigateConfig, config: FrigateConfig,
metrics: DataProcessorMetrics, metrics: DataProcessorMetrics | None,
stop_event: MpEvent, stop_event: MpEvent,
) -> None: ) -> None:
super().__init__(name="embeddings_maintainer") super().__init__(name="embeddings_maintainer")
@ -97,6 +96,22 @@ class EmbeddingMaintainer(threading.Thread):
[CameraConfigUpdateEnum.add, CameraConfigUpdateEnum.remove], [CameraConfigUpdateEnum.add, CameraConfigUpdateEnum.remove],
) )
# Configure Frigate DB
db = SqliteVecQueueDatabase(
config.database.path,
pragmas={
"auto_vacuum": "FULL", # Does not defragment database
"cache_size": -512 * 1000, # 512MB of cache
"synchronous": "NORMAL", # Safe when using WAL https://www.sqlite.org/pragma.html#pragma_synchronous
},
timeout=max(
60, 10 * len([c for c in config.cameras.values() if c.enabled])
),
load_vec_extension=True,
)
models = [Event, Recordings]
db.bind(models)
if config.semantic_search.enabled: if config.semantic_search.enabled:
self.embeddings = Embeddings(config, db, metrics) self.embeddings = Embeddings(config, db, metrics)

View File

@ -6,12 +6,12 @@ import random
import string import string
import threading import threading
import time import time
from multiprocessing.managers import DictProxy
from typing import Any, Tuple from typing import Any, Tuple
import numpy as np import numpy as np
import frigate.util as util import frigate.util as util
from frigate.camera import CameraMetrics
from frigate.comms.detections_updater import DetectionPublisher, DetectionTypeEnum from frigate.comms.detections_updater import DetectionPublisher, DetectionTypeEnum
from frigate.comms.event_metadata_updater import ( from frigate.comms.event_metadata_updater import (
EventMetadataPublisher, EventMetadataPublisher,
@ -83,7 +83,7 @@ class AudioProcessor(util.Process):
self, self,
config: FrigateConfig, config: FrigateConfig,
cameras: list[CameraConfig], cameras: list[CameraConfig],
camera_metrics: dict[str, CameraMetrics], camera_metrics: DictProxy,
): ):
super().__init__(name="frigate.audio_manager", daemon=True) super().__init__(name="frigate.audio_manager", daemon=True)
@ -93,7 +93,7 @@ class AudioProcessor(util.Process):
if any( if any(
[ [
conf.audio_transcription.enabled_in_config conf.audio_transcription.enabled_in_config == True
for conf in config.cameras.values() for conf in config.cameras.values()
] ]
): ):
@ -105,6 +105,7 @@ class AudioProcessor(util.Process):
self.transcription_model_runner = None self.transcription_model_runner = None
def run(self) -> None: def run(self) -> None:
self.pre_run_setup()
audio_threads: list[AudioEventMaintainer] = [] audio_threads: list[AudioEventMaintainer] = []
threading.current_thread().name = "process:audio_manager" threading.current_thread().name = "process:audio_manager"
@ -146,7 +147,7 @@ class AudioEventMaintainer(threading.Thread):
self, self,
camera: CameraConfig, camera: CameraConfig,
config: FrigateConfig, config: FrigateConfig,
camera_metrics: dict[str, CameraMetrics], camera_metrics: DictProxy,
audio_transcription_model_runner: AudioTranscriptionModelRunner | None, audio_transcription_model_runner: AudioTranscriptionModelRunner | None,
stop_event: threading.Event, stop_event: threading.Event,
) -> None: ) -> None:

View File

@ -1,11 +1,13 @@
# In log.py
import atexit import atexit
import logging import logging
import multiprocessing as mp
import os import os
import sys import sys
import threading import threading
from collections import deque from collections import deque
from logging.handlers import QueueHandler, QueueListener from logging.handlers import QueueHandler, QueueListener
from multiprocessing.managers import SyncManager
from queue import Queue
from typing import Deque, Optional from typing import Deque, Optional
from frigate.util.builtin import clean_camera_user_pass from frigate.util.builtin import clean_camera_user_pass
@ -32,12 +34,12 @@ LOG_HANDLER.addFilter(
) )
log_listener: Optional[QueueListener] = None log_listener: Optional[QueueListener] = None
log_queue: Optional[Queue] = None
def setup_logging() -> None: def setup_logging(manager: SyncManager) -> None:
global log_listener global log_listener, log_queue
log_queue = manager.Queue()
log_queue: mp.Queue = mp.Queue()
log_listener = QueueListener(log_queue, LOG_HANDLER, respect_handler_level=True) log_listener = QueueListener(log_queue, LOG_HANDLER, respect_handler_level=True)
atexit.register(_stop_logging) atexit.register(_stop_logging)
@ -54,7 +56,6 @@ def setup_logging() -> None:
def _stop_logging() -> None: def _stop_logging() -> None:
global log_listener global log_listener
if log_listener is not None: if log_listener is not None:
log_listener.stop() log_listener.stop()
log_listener = None log_listener = None

View File

@ -1,16 +1,11 @@
import datetime import datetime
import logging import logging
import multiprocessing as mp
import os
import queue import queue
import signal
import threading
from abc import ABC, abstractmethod from abc import ABC, abstractmethod
from multiprocessing import Queue, Value from multiprocessing import Queue, Value
from multiprocessing.synchronize import Event as MpEvent from multiprocessing.synchronize import Event as MpEvent
import numpy as np import numpy as np
from setproctitle import setproctitle
import frigate.util as util import frigate.util as util
from frigate.comms.object_detector_signaler import ( from frigate.comms.object_detector_signaler import (
@ -25,7 +20,6 @@ from frigate.detectors.detector_config import (
) )
from frigate.util.builtin import EventsPerSecond, load_labels from frigate.util.builtin import EventsPerSecond, load_labels
from frigate.util.image import SharedMemoryFrameManager, UntrackedSharedMemory from frigate.util.image import SharedMemoryFrameManager, UntrackedSharedMemory
from frigate.util.services import listen
from .util import tensor_transform from .util import tensor_transform
@ -90,73 +84,75 @@ class LocalObjectDetector(ObjectDetector):
return self.detect_api.detect_raw(tensor_input=tensor_input) return self.detect_api.detect_raw(tensor_input=tensor_input)
def run_detector( class DetectorRunner(util.Process):
name: str, def __init__(
detection_queue: Queue, self,
cameras: list[str], name,
avg_speed: Value, detection_queue: Queue,
start: Value, cameras: list[str],
detector_config: BaseDetectorConfig, avg_speed: Value,
): start_time: Value,
threading.current_thread().name = f"detector:{name}" detector_config: BaseDetectorConfig,
logger = logging.getLogger(f"detector.{name}") ) -> None:
logger.info(f"Starting detection process: {os.getpid()}") super().__init__(name=name, daemon=True)
setproctitle(f"frigate.detector.{name}") self.detection_queue = detection_queue
listen() self.cameras = cameras
self.avg_speed = avg_speed
self.start_time = start_time
self.detector_config = detector_config
self.outputs: dict = {}
stop_event: MpEvent = mp.Event() def create_output_shm(self, name: str):
def receiveSignal(signalNumber, frame):
stop_event.set()
signal.signal(signal.SIGTERM, receiveSignal)
signal.signal(signal.SIGINT, receiveSignal)
def create_output_shm(name: str):
out_shm = UntrackedSharedMemory(name=f"out-{name}", create=False) out_shm = UntrackedSharedMemory(name=f"out-{name}", create=False)
out_np = np.ndarray((20, 6), dtype=np.float32, buffer=out_shm.buf) out_np = np.ndarray((20, 6), dtype=np.float32, buffer=out_shm.buf)
outputs[name] = {"shm": out_shm, "np": out_np} self.outputs[name] = {"shm": out_shm, "np": out_np}
frame_manager = SharedMemoryFrameManager() def run(self) -> None:
object_detector = LocalObjectDetector(detector_config=detector_config) self.pre_run_setup()
detector_publisher = ObjectDetectorPublisher()
outputs = {} frame_manager = SharedMemoryFrameManager()
for name in cameras: object_detector = LocalObjectDetector(detector_config=self.detector_config)
create_output_shm(name) detector_publisher = ObjectDetectorPublisher()
while not stop_event.is_set(): for name in self.cameras:
try: self.create_output_shm(name)
connection_id = detection_queue.get(timeout=1)
except queue.Empty:
continue
input_frame = frame_manager.get(
connection_id,
(1, detector_config.model.height, detector_config.model.width, 3),
)
if input_frame is None: while not self.stop_event.is_set():
logger.warning(f"Failed to get frame {connection_id} from SHM") try:
continue connection_id = self.detection_queue.get(timeout=1)
except queue.Empty:
continue
input_frame = frame_manager.get(
connection_id,
(
1,
self.detector_config.model.height,
self.detector_config.model.width,
3,
),
)
# detect and send the output if input_frame is None:
start.value = datetime.datetime.now().timestamp() logger.warning(f"Failed to get frame {connection_id} from SHM")
detections = object_detector.detect_raw(input_frame) continue
duration = datetime.datetime.now().timestamp() - start.value
frame_manager.close(connection_id)
if connection_id not in outputs: # detect and send the output
create_output_shm(connection_id) self.start_time.value = datetime.datetime.now().timestamp()
detections = object_detector.detect_raw(input_frame)
duration = datetime.datetime.now().timestamp() - self.start_time.value
frame_manager.close(connection_id)
outputs[connection_id]["np"][:] = detections[:] if connection_id not in self.outputs:
signal_id = f"{connection_id}/update" self.create_output_shm(connection_id)
detector_publisher.publish(signal_id, signal_id)
start.value = 0.0
avg_speed.value = (avg_speed.value * 9 + duration) / 10 self.outputs[connection_id]["np"][:] = detections[:]
detector_publisher.publish(connection_id)
self.start_time.value = 0.0
detector_publisher.stop() self.avg_speed.value = (self.avg_speed.value * 9 + duration) / 10
logger.info("Exited detection process...")
detector_publisher.stop()
logger.info("Exited detection process...")
class ObjectDetectProcess: class ObjectDetectProcess:
@ -193,19 +189,14 @@ class ObjectDetectProcess:
self.detection_start.value = 0.0 self.detection_start.value = 0.0
if (self.detect_process is not None) and self.detect_process.is_alive(): if (self.detect_process is not None) and self.detect_process.is_alive():
self.stop() self.stop()
self.detect_process = util.Process( self.detect_process = DetectorRunner(
target=run_detector, f"detector:{self.name}",
name=f"detector:{self.name}", self.detection_queue,
args=( self.cameras,
self.name, self.avg_inference_speed,
self.detection_queue, self.detection_start,
self.cameras, self.detector_config,
self.avg_inference_speed,
self.detection_start,
self.detector_config,
),
) )
self.detect_process.daemon = True
self.detect_process.start() self.detect_process.start()
@ -231,7 +222,7 @@ class RemoteObjectDetector:
) )
self.out_shm = UntrackedSharedMemory(name=f"out-{self.name}", create=False) self.out_shm = UntrackedSharedMemory(name=f"out-{self.name}", create=False)
self.out_np_shm = np.ndarray((20, 6), dtype=np.float32, buffer=self.out_shm.buf) self.out_np_shm = np.ndarray((20, 6), dtype=np.float32, buffer=self.out_shm.buf)
self.detector_subscriber = ObjectDetectorSubscriber(f"{name}/update") self.detector_subscriber = ObjectDetectorSubscriber(name)
def detect(self, tensor_input, threshold=0.4): def detect(self, tensor_input, threshold=0.4):
detections = [] detections = []

View File

@ -2,14 +2,11 @@
import datetime import datetime
import logging import logging
import multiprocessing as mp
import os import os
import shutil import shutil
import signal
import threading import threading
from wsgiref.simple_server import make_server from wsgiref.simple_server import make_server
from setproctitle import setproctitle
from ws4py.server.wsgirefserver import ( from ws4py.server.wsgirefserver import (
WebSocketWSGIHandler, WebSocketWSGIHandler,
WebSocketWSGIRequestHandler, WebSocketWSGIRequestHandler,
@ -17,6 +14,7 @@ from ws4py.server.wsgirefserver import (
) )
from ws4py.server.wsgiutils import WebSocketWSGIApplication from ws4py.server.wsgiutils import WebSocketWSGIApplication
import frigate.util as util
from frigate.comms.detections_updater import DetectionSubscriber, DetectionTypeEnum from frigate.comms.detections_updater import DetectionSubscriber, DetectionTypeEnum
from frigate.comms.ws import WebSocket from frigate.comms.ws import WebSocket
from frigate.config import FrigateConfig from frigate.config import FrigateConfig
@ -73,189 +71,193 @@ def check_disabled_camera_update(
birdseye.all_cameras_disabled() birdseye.all_cameras_disabled()
def output_frames( class OutputProcess(util.Process):
config: FrigateConfig, def __init__(self, config: FrigateConfig) -> None:
): super().__init__(name="frigate.output", daemon=True)
threading.current_thread().name = "output" self.config = config
setproctitle("frigate.output")
stop_event = mp.Event() def run(self) -> None:
self.pre_run_setup()
def receiveSignal(signalNumber, frame): frame_manager = SharedMemoryFrameManager()
stop_event.set()
signal.signal(signal.SIGTERM, receiveSignal) # start a websocket server on 8082
signal.signal(signal.SIGINT, receiveSignal) WebSocketWSGIHandler.http_version = "1.1"
websocket_server = make_server(
frame_manager = SharedMemoryFrameManager() "127.0.0.1",
8082,
# start a websocket server on 8082 server_class=WSGIServer,
WebSocketWSGIHandler.http_version = "1.1" handler_class=WebSocketWSGIRequestHandler,
websocket_server = make_server( app=WebSocketWSGIApplication(handler_cls=WebSocket),
"127.0.0.1",
8082,
server_class=WSGIServer,
handler_class=WebSocketWSGIRequestHandler,
app=WebSocketWSGIApplication(handler_cls=WebSocket),
)
websocket_server.initialize_websockets_manager()
websocket_thread = threading.Thread(target=websocket_server.serve_forever)
detection_subscriber = DetectionSubscriber(DetectionTypeEnum.video)
config_subscriber = CameraConfigUpdateSubscriber(
config,
config.cameras,
[
CameraConfigUpdateEnum.add,
CameraConfigUpdateEnum.birdseye,
CameraConfigUpdateEnum.enabled,
CameraConfigUpdateEnum.record,
],
)
jsmpeg_cameras: dict[str, JsmpegCamera] = {}
birdseye: Birdseye | None = None
preview_recorders: dict[str, PreviewRecorder] = {}
preview_write_times: dict[str, float] = {}
failed_frame_requests: dict[str, int] = {}
last_disabled_cam_check = datetime.datetime.now().timestamp()
move_preview_frames("cache")
for camera, cam_config in config.cameras.items():
if not cam_config.enabled_in_config:
continue
jsmpeg_cameras[camera] = JsmpegCamera(cam_config, stop_event, websocket_server)
preview_recorders[camera] = PreviewRecorder(cam_config)
preview_write_times[camera] = 0
if config.birdseye.enabled:
birdseye = Birdseye(config, stop_event, websocket_server)
websocket_thread.start()
while not stop_event.is_set():
# check if there is an updated config
updates = config_subscriber.check_for_updates()
if "add" in updates:
for camera in updates["add"]:
jsmpeg_cameras[camera] = JsmpegCamera(
cam_config, stop_event, websocket_server
)
preview_recorders[camera] = PreviewRecorder(cam_config)
preview_write_times[camera] = 0
(topic, data) = detection_subscriber.check_for_update(timeout=1)
now = datetime.datetime.now().timestamp()
if now - last_disabled_cam_check > 5:
# check disabled cameras every 5 seconds
last_disabled_cam_check = now
check_disabled_camera_update(
config, birdseye, preview_recorders, preview_write_times
)
if not topic:
continue
(
camera,
frame_name,
frame_time,
current_tracked_objects,
motion_boxes,
_,
) = data
if not config.cameras[camera].enabled:
continue
frame = frame_manager.get(frame_name, config.cameras[camera].frame_shape_yuv)
if frame is None:
logger.debug(f"Failed to get frame {frame_name} from SHM")
failed_frame_requests[camera] = failed_frame_requests.get(camera, 0) + 1
if failed_frame_requests[camera] > config.cameras[camera].detect.fps:
logger.warning(
f"Failed to retrieve many frames for {camera} from SHM, consider increasing SHM size if this continues."
)
continue
else:
failed_frame_requests[camera] = 0
# send frames for low fps recording
preview_recorders[camera].write_data(
current_tracked_objects, motion_boxes, frame_time, frame
) )
preview_write_times[camera] = frame_time websocket_server.initialize_websockets_manager()
websocket_thread = threading.Thread(target=websocket_server.serve_forever)
# send camera frame to ffmpeg process if websockets are connected detection_subscriber = DetectionSubscriber(DetectionTypeEnum.video)
if any( config_subscriber = CameraConfigUpdateSubscriber(
ws.environ["PATH_INFO"].endswith(camera) for ws in websocket_server.manager self.config,
): self.config.cameras,
# write to the converter for the camera if clients are listening to the specific camera [
jsmpeg_cameras[camera].write_frame(frame.tobytes()) CameraConfigUpdateEnum.add,
CameraConfigUpdateEnum.birdseye,
CameraConfigUpdateEnum.enabled,
CameraConfigUpdateEnum.record,
],
)
# send output data to birdseye if websocket is connected or restreaming jsmpeg_cameras: dict[str, JsmpegCamera] = {}
if config.birdseye.enabled and ( birdseye: Birdseye | None = None
config.birdseye.restream preview_recorders: dict[str, PreviewRecorder] = {}
or any( preview_write_times: dict[str, float] = {}
ws.environ["PATH_INFO"].endswith("birdseye") failed_frame_requests: dict[str, int] = {}
for ws in websocket_server.manager last_disabled_cam_check = datetime.datetime.now().timestamp()
move_preview_frames("cache")
for camera, cam_config in self.config.cameras.items():
if not cam_config.enabled_in_config:
continue
jsmpeg_cameras[camera] = JsmpegCamera(
cam_config, self.stop_event, websocket_server
) )
): preview_recorders[camera] = PreviewRecorder(cam_config)
birdseye.write_data( preview_write_times[camera] = 0
if self.config.birdseye.enabled:
birdseye = Birdseye(self.config, self.stop_event, websocket_server)
websocket_thread.start()
while not self.stop_event.is_set():
# check if there is an updated config
updates = config_subscriber.check_for_updates()
if "add" in updates:
for camera in updates["add"]:
jsmpeg_cameras[camera] = JsmpegCamera(
cam_config, self.stop_event, websocket_server
)
preview_recorders[camera] = PreviewRecorder(cam_config)
preview_write_times[camera] = 0
(topic, data) = detection_subscriber.check_for_update(timeout=1)
now = datetime.datetime.now().timestamp()
if now - last_disabled_cam_check > 5:
# check disabled cameras every 5 seconds
last_disabled_cam_check = now
check_disabled_camera_update(
self.config, birdseye, preview_recorders, preview_write_times
)
if not topic:
continue
(
camera, camera,
frame_name,
frame_time,
current_tracked_objects, current_tracked_objects,
motion_boxes, motion_boxes,
frame_time, _,
frame, ) = data
if not self.config.cameras[camera].enabled:
continue
frame = frame_manager.get(
frame_name, self.config.cameras[camera].frame_shape_yuv
) )
frame_manager.close(frame_name) if frame is None:
logger.debug(f"Failed to get frame {frame_name} from SHM")
failed_frame_requests[camera] = failed_frame_requests.get(camera, 0) + 1
move_preview_frames("clips") if (
failed_frame_requests[camera]
> self.config.cameras[camera].detect.fps
):
logger.warning(
f"Failed to retrieve many frames for {camera} from SHM, consider increasing SHM size if this continues."
)
while True: continue
(topic, data) = detection_subscriber.check_for_update(timeout=0) else:
failed_frame_requests[camera] = 0
if not topic: # send frames for low fps recording
break preview_recorders[camera].write_data(
current_tracked_objects, motion_boxes, frame_time, frame
)
preview_write_times[camera] = frame_time
( # send camera frame to ffmpeg process if websockets are connected
camera, if any(
frame_name, ws.environ["PATH_INFO"].endswith(camera)
frame_time, for ws in websocket_server.manager
current_tracked_objects, ):
motion_boxes, # write to the converter for the camera if clients are listening to the specific camera
regions, jsmpeg_cameras[camera].write_frame(frame.tobytes())
) = data
frame = frame_manager.get(frame_name, config.cameras[camera].frame_shape_yuv) # send output data to birdseye if websocket is connected or restreaming
frame_manager.close(frame_name) if self.config.birdseye.enabled and (
self.config.birdseye.restream
or any(
ws.environ["PATH_INFO"].endswith("birdseye")
for ws in websocket_server.manager
)
):
birdseye.write_data(
camera,
current_tracked_objects,
motion_boxes,
frame_time,
frame,
)
detection_subscriber.stop() frame_manager.close(frame_name)
for jsmpeg in jsmpeg_cameras.values(): move_preview_frames("clips")
jsmpeg.stop()
for preview in preview_recorders.values(): while True:
preview.stop() (topic, data) = detection_subscriber.check_for_update(timeout=0)
if birdseye is not None: if not topic:
birdseye.stop() break
config_subscriber.stop() (
websocket_server.manager.close_all() camera,
websocket_server.manager.stop() frame_name,
websocket_server.manager.join() frame_time,
websocket_server.shutdown() current_tracked_objects,
websocket_thread.join() motion_boxes,
logger.info("exiting output process...") regions,
) = data
frame = frame_manager.get(
frame_name, self.config.cameras[camera].frame_shape_yuv
)
frame_manager.close(frame_name)
detection_subscriber.stop()
for jsmpeg in jsmpeg_cameras.values():
jsmpeg.stop()
for preview in preview_recorders.values():
preview.stop()
if birdseye is not None:
birdseye.stop()
config_subscriber.stop()
websocket_server.manager.close_all()
websocket_server.manager.stop()
websocket_server.manager.join()
websocket_server.shutdown()
websocket_thread.join()
logger.info("exiting output process...")
def move_preview_frames(loc: str): def move_preview_frames(loc: str):

View File

@ -1,50 +1,40 @@
"""Run recording maintainer and cleanup.""" """Run recording maintainer and cleanup."""
import logging import logging
import multiprocessing as mp
import signal
import threading
from types import FrameType
from typing import Optional
from playhouse.sqliteq import SqliteQueueDatabase from playhouse.sqliteq import SqliteQueueDatabase
from setproctitle import setproctitle
from frigate.config import FrigateConfig from frigate.config import FrigateConfig
from frigate.models import Recordings, ReviewSegment from frigate.models import Recordings, ReviewSegment
from frigate.record.maintainer import RecordingMaintainer from frigate.record.maintainer import RecordingMaintainer
from frigate.util.services import listen from frigate.util import Process as FrigateProcess
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
def manage_recordings(config: FrigateConfig) -> None: class RecordProcess(FrigateProcess):
stop_event = mp.Event() def __init__(self, config: FrigateConfig) -> None:
super().__init__(name="frigate.recording_manager", daemon=True)
self.config = config
def receiveSignal(signalNumber: int, frame: Optional[FrameType]) -> None: def run(self) -> None:
stop_event.set() self.pre_run_setup()
db = SqliteQueueDatabase(
self.config.database.path,
pragmas={
"auto_vacuum": "FULL", # Does not defragment database
"cache_size": -512 * 1000, # 512MB of cache
"synchronous": "NORMAL", # Safe when using WAL https://www.sqlite.org/pragma.html#pragma_synchronous
},
timeout=max(
60, 10 * len([c for c in self.config.cameras.values() if c.enabled])
),
)
models = [ReviewSegment, Recordings]
db.bind(models)
signal.signal(signal.SIGTERM, receiveSignal) maintainer = RecordingMaintainer(
signal.signal(signal.SIGINT, receiveSignal) self.config,
self.stop_event,
threading.current_thread().name = "process:recording_manager" )
setproctitle("frigate.recording_manager") maintainer.start()
listen()
db = SqliteQueueDatabase(
config.database.path,
pragmas={
"auto_vacuum": "FULL", # Does not defragment database
"cache_size": -512 * 1000, # 512MB of cache
"synchronous": "NORMAL", # Safe when using WAL https://www.sqlite.org/pragma.html#pragma_synchronous
},
timeout=max(60, 10 * len([c for c in config.cameras.values() if c.enabled])),
)
models = [ReviewSegment, Recordings]
db.bind(models)
maintainer = RecordingMaintainer(
config,
stop_event,
)
maintainer.start()

View File

@ -1,36 +1,23 @@
"""Run recording maintainer and cleanup.""" """Run recording maintainer and cleanup."""
import logging import logging
import multiprocessing as mp
import signal
import threading
from types import FrameType
from typing import Optional
from setproctitle import setproctitle
import frigate.util as util
from frigate.config import FrigateConfig from frigate.config import FrigateConfig
from frigate.review.maintainer import ReviewSegmentMaintainer from frigate.review.maintainer import ReviewSegmentMaintainer
from frigate.util.services import listen
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
def manage_review_segments(config: FrigateConfig) -> None: class ReviewProcess(util.Process):
stop_event = mp.Event() def __init__(self, config: FrigateConfig) -> None:
super().__init__(name="frigate.review_segment_manager", daemon=True)
self.config = config
def receiveSignal(signalNumber: int, frame: Optional[FrameType]) -> None: def run(self) -> None:
stop_event.set() self.pre_run_setup()
maintainer = ReviewSegmentMaintainer(
signal.signal(signal.SIGTERM, receiveSignal) self.config,
signal.signal(signal.SIGINT, receiveSignal) self.stop_event,
)
threading.current_thread().name = "process:review_segment_manager" maintainer.start()
setproctitle("frigate.review_segment_manager")
listen()
maintainer = ReviewSegmentMaintainer(
config,
stop_event,
)
maintainer.start()

View File

@ -5,13 +5,13 @@ import os
import shutil import shutil
import time import time
from json import JSONDecodeError from json import JSONDecodeError
from multiprocessing.managers import DictProxy
from typing import Any, Optional from typing import Any, Optional
import psutil import psutil
import requests import requests
from requests.exceptions import RequestException from requests.exceptions import RequestException
from frigate.camera import CameraMetrics
from frigate.config import FrigateConfig from frigate.config import FrigateConfig
from frigate.const import CACHE_DIR, CLIPS_DIR, RECORD_DIR from frigate.const import CACHE_DIR, CLIPS_DIR, RECORD_DIR
from frigate.data_processing.types import DataProcessorMetrics from frigate.data_processing.types import DataProcessorMetrics
@ -53,7 +53,7 @@ def get_latest_version(config: FrigateConfig) -> str:
def stats_init( def stats_init(
config: FrigateConfig, config: FrigateConfig,
camera_metrics: dict[str, CameraMetrics], camera_metrics: DictProxy,
embeddings_metrics: DataProcessorMetrics | None, embeddings_metrics: DataProcessorMetrics | None,
detectors: dict[str, ObjectDetectProcess], detectors: dict[str, ObjectDetectProcess],
processes: dict[str, int], processes: dict[str, int],
@ -271,10 +271,12 @@ def stats_snapshot(
stats["cameras"] = {} stats["cameras"] = {}
for name, camera_stats in camera_metrics.items(): for name, camera_stats in camera_metrics.items():
total_detection_fps += camera_stats.detection_fps.value total_detection_fps += camera_stats.detection_fps.value
pid = camera_stats.process.pid if camera_stats.process else None pid = camera_stats.process_pid.value if camera_stats.process_pid.value else None
ffmpeg_pid = camera_stats.ffmpeg_pid.value if camera_stats.ffmpeg_pid else None ffmpeg_pid = camera_stats.ffmpeg_pid.value if camera_stats.ffmpeg_pid else None
capture_pid = ( capture_pid = (
camera_stats.capture_process.pid if camera_stats.capture_process else None camera_stats.capture_process_pid.value
if camera_stats.capture_process_pid.value
else None
) )
stats["cameras"][name] = { stats["cameras"][name] = {
"camera_fps": round(camera_stats.camera_fps.value, 2), "camera_fps": round(camera_stats.camera_fps.value, 2),

View File

@ -341,11 +341,14 @@ def clear_and_unlink(file: Path, missing_ok: bool = True) -> None:
def empty_and_close_queue(q: mp.Queue): def empty_and_close_queue(q: mp.Queue):
while True: while True:
try: try:
q.get(block=True, timeout=0.5) try:
except queue.Empty: q.get(block=True, timeout=0.5)
q.close() except (queue.Empty, EOFError):
q.join_thread() q.close()
return q.join_thread()
return
except AttributeError:
pass
def generate_color_palette(n): def generate_color_palette(n):

View File

@ -4,9 +4,8 @@ import multiprocessing as mp
import signal import signal
import sys import sys
import threading import threading
from functools import wraps
from logging.handlers import QueueHandler from logging.handlers import QueueHandler
from typing import Any, Callable, Optional from typing import Callable, Optional
import frigate.log import frigate.log
@ -30,34 +29,12 @@ class BaseProcess(mp.Process):
super().start(*args, **kwargs) super().start(*args, **kwargs)
self.after_start() self.after_start()
def __getattribute__(self, name: str) -> Any:
if name == "run":
run = super().__getattribute__("run")
@wraps(run)
def run_wrapper(*args, **kwargs):
try:
self.before_run()
return run(*args, **kwargs)
finally:
self.after_run()
return run_wrapper
return super().__getattribute__(name)
def before_start(self) -> None: def before_start(self) -> None:
pass pass
def after_start(self) -> None: def after_start(self) -> None:
pass pass
def before_run(self) -> None:
pass
def after_run(self) -> None:
pass
class Process(BaseProcess): class Process(BaseProcess):
logger: logging.Logger logger: logging.Logger
@ -73,7 +50,7 @@ class Process(BaseProcess):
def before_start(self) -> None: def before_start(self) -> None:
self.__log_queue = frigate.log.log_listener.queue self.__log_queue = frigate.log.log_listener.queue
def before_run(self) -> None: def pre_run_setup(self) -> None:
faulthandler.enable() faulthandler.enable()
def receiveSignal(signalNumber, frame): def receiveSignal(signalNumber, frame):
@ -88,8 +65,6 @@ class Process(BaseProcess):
signal.signal(signal.SIGTERM, receiveSignal) signal.signal(signal.SIGTERM, receiveSignal)
signal.signal(signal.SIGINT, receiveSignal) signal.signal(signal.SIGINT, receiveSignal)
self.logger = logging.getLogger(self.name) self.logger = logging.getLogger(self.name)
logging.basicConfig(handlers=[], force=True) logging.basicConfig(handlers=[], force=True)
logging.getLogger().addHandler(QueueHandler(self.__log_queue)) logging.getLogger().addHandler(QueueHandler(self.__log_queue))

View File

@ -1,9 +1,7 @@
import datetime import datetime
import logging import logging
import multiprocessing as mp
import os import os
import queue import queue
import signal
import subprocess as sp import subprocess as sp
import threading import threading
import time import time
@ -12,8 +10,8 @@ from multiprocessing.synchronize import Event as MpEvent
from typing import Any from typing import Any
import cv2 import cv2
from setproctitle import setproctitle
import frigate.util as util
from frigate.camera import CameraMetrics, PTZMetrics from frigate.camera import CameraMetrics, PTZMetrics
from frigate.comms.inter_process import InterProcessRequestor from frigate.comms.inter_process import InterProcessRequestor
from frigate.config import CameraConfig, DetectConfig, ModelConfig from frigate.config import CameraConfig, DetectConfig, ModelConfig
@ -53,7 +51,6 @@ from frigate.util.object import (
is_object_filtered, is_object_filtered,
reduce_detections, reduce_detections,
) )
from frigate.util.services import listen
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
@ -318,7 +315,7 @@ class CameraWatchdog(threading.Thread):
ffmpeg_cmd, self.logger, self.logpipe, self.frame_size ffmpeg_cmd, self.logger, self.logpipe, self.frame_size
) )
self.ffmpeg_pid.value = self.ffmpeg_detect_process.pid self.ffmpeg_pid.value = self.ffmpeg_detect_process.pid
self.capture_thread = CameraCapture( self.capture_thread = CameraCaptureRunner(
self.config, self.config,
self.shm_frame_count, self.shm_frame_count,
self.frame_index, self.frame_index,
@ -396,7 +393,7 @@ class CameraWatchdog(threading.Thread):
return newest_segment_time return newest_segment_time
class CameraCapture(threading.Thread): class CameraCaptureRunner(threading.Thread):
def __init__( def __init__(
self, self,
config: CameraConfig, config: CameraConfig,
@ -440,103 +437,103 @@ class CameraCapture(threading.Thread):
) )
def capture_camera( class CameraCapture(util.Process):
config: CameraConfig, shm_frame_count: int, camera_metrics: CameraMetrics def __init__(
): self, config: CameraConfig, shm_frame_count: int, camera_metrics: CameraMetrics
stop_event = mp.Event() ) -> None:
super().__init__(name=f"camera_capture:{config.name}", daemon=True)
self.config = config
self.shm_frame_count = shm_frame_count
self.camera_metrics = camera_metrics
def receiveSignal(signalNumber, frame): def run(self) -> None:
stop_event.set() self.pre_run_setup()
camera_watchdog = CameraWatchdog(
signal.signal(signal.SIGTERM, receiveSignal) self.config,
signal.signal(signal.SIGINT, receiveSignal) self.shm_frame_count,
self.camera_metrics.frame_queue,
threading.current_thread().name = f"capture:{config.name}" self.camera_metrics.camera_fps,
setproctitle(f"frigate.capture:{config.name}") self.camera_metrics.skipped_fps,
self.camera_metrics.ffmpeg_pid,
camera_watchdog = CameraWatchdog( self.stop_event,
config, )
shm_frame_count, camera_watchdog.start()
camera_metrics.frame_queue, camera_watchdog.join()
camera_metrics.camera_fps,
camera_metrics.skipped_fps,
camera_metrics.ffmpeg_pid,
stop_event,
)
camera_watchdog.start()
camera_watchdog.join()
def track_camera( class CameraTracker(util.Process):
name, def __init__(
config: CameraConfig, self,
model_config: ModelConfig, config: CameraConfig,
labelmap: dict[int, str], model_config: ModelConfig,
detection_queue: Queue, labelmap: dict[int, str],
detected_objects_queue, detection_queue: Queue,
camera_metrics: CameraMetrics,
ptz_metrics: PTZMetrics,
region_grid: list[list[dict[str, Any]]],
):
stop_event = mp.Event()
def receiveSignal(signalNumber, frame):
stop_event.set()
signal.signal(signal.SIGTERM, receiveSignal)
signal.signal(signal.SIGINT, receiveSignal)
threading.current_thread().name = f"process:{name}"
setproctitle(f"frigate.process:{name}")
listen()
frame_queue = camera_metrics.frame_queue
frame_shape = config.frame_shape
motion_detector = ImprovedMotionDetector(
frame_shape,
config.motion,
config.detect.fps,
name=config.name,
ptz_metrics=ptz_metrics,
)
object_detector = RemoteObjectDetector(
name, labelmap, detection_queue, model_config, stop_event
)
object_tracker = NorfairTracker(config, ptz_metrics)
frame_manager = SharedMemoryFrameManager()
# create communication for region grid updates
requestor = InterProcessRequestor()
process_frames(
name,
requestor,
frame_queue,
frame_shape,
model_config,
config,
frame_manager,
motion_detector,
object_detector,
object_tracker,
detected_objects_queue, detected_objects_queue,
camera_metrics, camera_metrics: CameraMetrics,
stop_event, ptz_metrics: PTZMetrics,
ptz_metrics, region_grid: list[list[dict[str, Any]]],
region_grid, ) -> None:
) super().__init__(name=f"camera_processor:{config.name}", daemon=True)
self.config = config
self.model_config = model_config
self.labelmap = labelmap
self.detection_queue = detection_queue
self.detected_objects_queue = detected_objects_queue
self.camera_metrics = camera_metrics
self.ptz_metrics = ptz_metrics
self.region_grid = region_grid
# empty the frame queue def run(self) -> None:
logger.info(f"{name}: emptying frame queue") self.pre_run_setup()
while not frame_queue.empty(): frame_queue = self.camera_metrics.frame_queue
(frame_name, _) = frame_queue.get(False) frame_shape = self.config.frame_shape
frame_manager.delete(frame_name)
logger.info(f"{name}: exiting subprocess") motion_detector = ImprovedMotionDetector(
frame_shape,
self.config.motion,
self.config.detect.fps,
name=self.config.name,
ptz_metrics=self.ptz_metrics,
)
object_detector = RemoteObjectDetector(
self.config.name,
self.labelmap,
self.detection_queue,
self.model_config,
self.stop_event,
)
object_tracker = NorfairTracker(self.config, self.ptz_metrics)
frame_manager = SharedMemoryFrameManager()
# create communication for region grid updates
requestor = InterProcessRequestor()
process_frames(
requestor,
frame_queue,
frame_shape,
self.model_config,
self.config,
frame_manager,
motion_detector,
object_detector,
object_tracker,
self.detected_objects_queue,
self.camera_metrics,
self.stop_event,
self.ptz_metrics,
self.region_grid,
)
# empty the frame queue
logger.info(f"{self.config.name}: emptying frame queue")
while not frame_queue.empty():
(frame_name, _) = frame_queue.get(False)
frame_manager.delete(frame_name)
logger.info(f"{self.config.name}: exiting subprocess")
def detect( def detect(
@ -577,7 +574,6 @@ def detect(
def process_frames( def process_frames(
camera_name: str,
requestor: InterProcessRequestor, requestor: InterProcessRequestor,
frame_queue: Queue, frame_queue: Queue,
frame_shape: tuple[int, int], frame_shape: tuple[int, int],
@ -597,7 +593,7 @@ def process_frames(
next_region_update = get_tomorrow_at_time(2) next_region_update = get_tomorrow_at_time(2)
config_subscriber = CameraConfigUpdateSubscriber( config_subscriber = CameraConfigUpdateSubscriber(
None, None,
{camera_name: camera_config}, {camera_config.name: camera_config},
[ [
CameraConfigUpdateEnum.detect, CameraConfigUpdateEnum.detect,
CameraConfigUpdateEnum.enabled, CameraConfigUpdateEnum.enabled,
@ -653,7 +649,9 @@ def process_frames(
and prev_enabled != camera_enabled and prev_enabled != camera_enabled
and camera_metrics.frame_queue.empty() and camera_metrics.frame_queue.empty()
): ):
logger.debug(f"Camera {camera_name} disabled, clearing tracked objects") logger.debug(
f"Camera {camera_config.name} disabled, clearing tracked objects"
)
prev_enabled = camera_enabled prev_enabled = camera_enabled
# Clear norfair's dictionaries # Clear norfair's dictionaries
@ -678,7 +676,7 @@ def process_frames(
datetime.datetime.now().astimezone(datetime.timezone.utc) datetime.datetime.now().astimezone(datetime.timezone.utc)
> next_region_update > next_region_update
): ):
region_grid = requestor.send_data(REQUEST_REGION_GRID, camera_name) region_grid = requestor.send_data(REQUEST_REGION_GRID, camera_config.name)
next_region_update = get_tomorrow_at_time(2) next_region_update = get_tomorrow_at_time(2)
try: try:
@ -698,7 +696,9 @@ def process_frames(
frame = frame_manager.get(frame_name, (frame_shape[0] * 3 // 2, frame_shape[1])) frame = frame_manager.get(frame_name, (frame_shape[0] * 3 // 2, frame_shape[1]))
if frame is None: if frame is None:
logger.debug(f"{camera_name}: frame {frame_time} is not in memory store.") logger.debug(
f"{camera_config.name}: frame {frame_time} is not in memory store."
)
continue continue
# look for motion if enabled # look for motion if enabled
@ -937,7 +937,7 @@ def process_frames(
) )
cv2.imwrite( cv2.imwrite(
f"debug/frames/{camera_name}-{'{:.6f}'.format(frame_time)}.jpg", f"debug/frames/{camera_config.name}-{'{:.6f}'.format(frame_time)}.jpg",
bgr_frame, bgr_frame,
) )
# add to the queue if not full # add to the queue if not full
@ -949,7 +949,7 @@ def process_frames(
camera_metrics.process_fps.value = fps_tracker.eps() camera_metrics.process_fps.value = fps_tracker.eps()
detected_objects_queue.put( detected_objects_queue.put(
( (
camera_name, camera_config.name,
frame_name, frame_name,
frame_time, frame_time,
detections, detections,

View File

@ -173,7 +173,7 @@ export default function CameraMetrics({
}); });
series[key]["detect"].data.push({ series[key]["detect"].data.push({
x: statsIdx, x: statsIdx,
y: stats.cpu_usages[camStats.pid.toString()].cpu, y: stats.cpu_usages[camStats.pid?.toString()]?.cpu,
}); });
}); });
}); });