mirror of
https://github.com/blakeblackshear/frigate.git
synced 2025-08-31 13:48:19 +02:00
* backend refactor shm calculation to utility function so it can be used in frontend stats * frontend * fix check * clean up
221 lines
8.4 KiB
Python
221 lines
8.4 KiB
Python
"""Create and maintain camera processes / management."""
|
|
|
|
import logging
|
|
import multiprocessing as mp
|
|
import threading
|
|
from multiprocessing import Queue
|
|
from multiprocessing.managers import DictProxy, SyncManager
|
|
from multiprocessing.synchronize import Event as MpEvent
|
|
|
|
from frigate.camera import CameraMetrics, PTZMetrics
|
|
from frigate.config import FrigateConfig
|
|
from frigate.config.camera import CameraConfig
|
|
from frigate.config.camera.updater import (
|
|
CameraConfigUpdateEnum,
|
|
CameraConfigUpdateSubscriber,
|
|
)
|
|
from frigate.models import Regions
|
|
from frigate.util.builtin import empty_and_close_queue
|
|
from frigate.util.image import SharedMemoryFrameManager, UntrackedSharedMemory
|
|
from frigate.util.object import get_camera_regions_grid
|
|
from frigate.util.services import calculate_shm_requirements
|
|
from frigate.video import CameraCapture, CameraTracker
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
class CameraMaintainer(threading.Thread):
|
|
def __init__(
|
|
self,
|
|
config: FrigateConfig,
|
|
detection_queue: Queue,
|
|
detected_frames_queue: Queue,
|
|
camera_metrics: DictProxy,
|
|
ptz_metrics: dict[str, PTZMetrics],
|
|
stop_event: MpEvent,
|
|
metrics_manager: SyncManager,
|
|
):
|
|
super().__init__(name="camera_processor")
|
|
self.config = config
|
|
self.detection_queue = detection_queue
|
|
self.detected_frames_queue = detected_frames_queue
|
|
self.stop_event = stop_event
|
|
self.camera_metrics = camera_metrics
|
|
self.ptz_metrics = ptz_metrics
|
|
self.frame_manager = SharedMemoryFrameManager()
|
|
self.region_grids: dict[str, list[list[dict[str, int]]]] = {}
|
|
self.update_subscriber = CameraConfigUpdateSubscriber(
|
|
self.config,
|
|
{},
|
|
[
|
|
CameraConfigUpdateEnum.add,
|
|
CameraConfigUpdateEnum.remove,
|
|
],
|
|
)
|
|
self.shm_count = self.__calculate_shm_frame_count()
|
|
self.camera_processes: dict[str, mp.Process] = {}
|
|
self.capture_processes: dict[str, mp.Process] = {}
|
|
self.metrics_manager = metrics_manager
|
|
|
|
def __init_historical_regions(self) -> None:
|
|
# delete region grids for removed or renamed cameras
|
|
cameras = list(self.config.cameras.keys())
|
|
Regions.delete().where(~(Regions.camera << cameras)).execute()
|
|
|
|
# create or update region grids for each camera
|
|
for camera in self.config.cameras.values():
|
|
assert camera.name is not None
|
|
self.region_grids[camera.name] = get_camera_regions_grid(
|
|
camera.name,
|
|
camera.detect,
|
|
max(self.config.model.width, self.config.model.height),
|
|
)
|
|
|
|
def __calculate_shm_frame_count(self) -> int:
|
|
shm_stats = calculate_shm_requirements(self.config)
|
|
|
|
if not shm_stats:
|
|
# /dev/shm not available
|
|
return 0
|
|
|
|
logger.debug(
|
|
f"Calculated total camera size {shm_stats['available']} / "
|
|
f"{shm_stats['camera_frame_size']} :: {shm_stats['shm_frame_count']} "
|
|
f"frames for each camera in SHM"
|
|
)
|
|
|
|
if shm_stats["shm_frame_count"] < 20:
|
|
logger.warning(
|
|
f"The current SHM size of {shm_stats['total']}MB is too small, "
|
|
f"recommend increasing it to at least {shm_stats['min_shm']}MB."
|
|
)
|
|
|
|
return shm_stats["shm_frame_count"]
|
|
|
|
def __start_camera_processor(
|
|
self, name: str, config: CameraConfig, runtime: bool = False
|
|
) -> None:
|
|
if not config.enabled_in_config:
|
|
logger.info(f"Camera processor not started for disabled camera {name}")
|
|
return
|
|
|
|
if runtime:
|
|
self.camera_metrics[name] = CameraMetrics(self.metrics_manager)
|
|
self.ptz_metrics[name] = PTZMetrics(autotracker_enabled=False)
|
|
self.region_grids[name] = get_camera_regions_grid(
|
|
name,
|
|
config.detect,
|
|
max(self.config.model.width, self.config.model.height),
|
|
)
|
|
|
|
try:
|
|
largest_frame = max(
|
|
[
|
|
det.model.height * det.model.width * 3
|
|
if det.model is not None
|
|
else 320
|
|
for det in self.config.detectors.values()
|
|
]
|
|
)
|
|
UntrackedSharedMemory(name=f"out-{name}", create=True, size=20 * 6 * 4)
|
|
UntrackedSharedMemory(
|
|
name=name,
|
|
create=True,
|
|
size=largest_frame,
|
|
)
|
|
except FileExistsError:
|
|
pass
|
|
|
|
camera_process = CameraTracker(
|
|
config,
|
|
self.config.model,
|
|
self.config.model.merged_labelmap,
|
|
self.detection_queue,
|
|
self.detected_frames_queue,
|
|
self.camera_metrics[name],
|
|
self.ptz_metrics[name],
|
|
self.region_grids[name],
|
|
self.stop_event,
|
|
)
|
|
self.camera_processes[config.name] = camera_process
|
|
camera_process.start()
|
|
self.camera_metrics[config.name].process_pid.value = camera_process.pid
|
|
logger.info(f"Camera processor started for {config.name}: {camera_process.pid}")
|
|
|
|
def __start_camera_capture(
|
|
self, name: str, config: CameraConfig, runtime: bool = False
|
|
) -> None:
|
|
if not config.enabled_in_config:
|
|
logger.info(f"Capture process not started for disabled camera {name}")
|
|
return
|
|
|
|
# pre-create shms
|
|
count = 10 if runtime else self.shm_count
|
|
for i in range(count):
|
|
frame_size = config.frame_shape_yuv[0] * config.frame_shape_yuv[1]
|
|
self.frame_manager.create(f"{config.name}_frame{i}", frame_size)
|
|
|
|
capture_process = CameraCapture(
|
|
config, count, self.camera_metrics[name], self.stop_event
|
|
)
|
|
capture_process.daemon = True
|
|
self.capture_processes[name] = capture_process
|
|
capture_process.start()
|
|
self.camera_metrics[name].capture_process_pid.value = capture_process.pid
|
|
logger.info(f"Capture process started for {name}: {capture_process.pid}")
|
|
|
|
def __stop_camera_capture_process(self, camera: str) -> None:
|
|
capture_process = self.capture_processes[camera]
|
|
if capture_process is not None:
|
|
logger.info(f"Waiting for capture process for {camera} to stop")
|
|
capture_process.terminate()
|
|
capture_process.join()
|
|
|
|
def __stop_camera_process(self, camera: str) -> None:
|
|
camera_process = self.camera_processes[camera]
|
|
if camera_process is not None:
|
|
logger.info(f"Waiting for process for {camera} to stop")
|
|
camera_process.terminate()
|
|
camera_process.join()
|
|
logger.info(f"Closing frame queue for {camera}")
|
|
empty_and_close_queue(self.camera_metrics[camera].frame_queue)
|
|
|
|
def run(self):
|
|
self.__init_historical_regions()
|
|
|
|
# start camera processes
|
|
for camera, config in self.config.cameras.items():
|
|
self.__start_camera_processor(camera, config)
|
|
self.__start_camera_capture(camera, config)
|
|
|
|
while not self.stop_event.wait(1):
|
|
updates = self.update_subscriber.check_for_updates()
|
|
|
|
for update_type, updated_cameras in updates.items():
|
|
if update_type == CameraConfigUpdateEnum.add.name:
|
|
for camera in updated_cameras:
|
|
self.__start_camera_processor(
|
|
camera,
|
|
self.update_subscriber.camera_configs[camera],
|
|
runtime=True,
|
|
)
|
|
self.__start_camera_capture(
|
|
camera,
|
|
self.update_subscriber.camera_configs[camera],
|
|
runtime=True,
|
|
)
|
|
elif update_type == CameraConfigUpdateEnum.remove.name:
|
|
self.__stop_camera_capture_process(camera)
|
|
self.__stop_camera_process(camera)
|
|
|
|
# ensure the capture processes are done
|
|
for camera in self.camera_processes.keys():
|
|
self.__stop_camera_capture_process(camera)
|
|
|
|
# ensure the camera processors are done
|
|
for camera in self.capture_processes.keys():
|
|
self.__stop_camera_process(camera)
|
|
|
|
self.update_subscriber.stop()
|
|
self.frame_manager.cleanup()
|