From dd3dc7949a6cec05c37fd31045de259e28616b05 Mon Sep 17 00:00:00 2001 From: Nicolas Mowen Date: Wed, 14 Feb 2024 17:24:36 -0700 Subject: [PATCH] Use zmq for inter process communication (#9309) * Use zmq for inter process communication * Use localhost for reply and request * Use pyobj instead of json and Need to use separate requestors for each audio listener * Cleanup port defining --- docker/main/requirements-wheels.txt | 1 + frigate/app.py | 13 +------ frigate/comms/dispatcher.py | 15 ++++---- frigate/comms/inter_process.py | 59 ++++++++++++++++++++++------- frigate/config.py | 9 ++++- frigate/const.py | 4 ++ frigate/events/audio.py | 26 +++++-------- frigate/ffmpeg_presets.py | 2 +- frigate/output/output.py | 3 +- frigate/output/preview.py | 38 ++++++++++--------- frigate/record/maintainer.py | 13 +++++-- frigate/record/record.py | 2 - frigate/video.py | 22 +++++------ 13 files changed, 116 insertions(+), 91 deletions(-) diff --git a/docker/main/requirements-wheels.txt b/docker/main/requirements-wheels.txt index a02be24e8..9223a441f 100644 --- a/docker/main/requirements-wheels.txt +++ b/docker/main/requirements-wheels.txt @@ -15,6 +15,7 @@ pydantic == 1.10.* git+https://github.com/fbcotter/py3nvml#egg=py3nvml PyYAML == 6.0.* pytz == 2023.3.post1 +pyzmq == 25.1.* ruamel.yaml == 0.18.* tzlocal == 5.2 types-PyYAML == 6.0.* diff --git a/frigate/app.py b/frigate/app.py index 080261d2a..08bb694a0 100644 --- a/frigate/app.py +++ b/frigate/app.py @@ -171,7 +171,6 @@ class FrigateApp: # issue https://github.com/python/typeshed/issues/8799 # from mypy 0.981 onwards "frame_queue": mp.Queue(maxsize=2), - "region_grid_queue": mp.Queue(maxsize=1), "capture_process": None, "process": None, "audio_rms": mp.Value("d", 0.0), # type: ignore[typeddict-item] @@ -273,9 +272,6 @@ class FrigateApp: # Queue for timeline events self.timeline_queue: Queue = mp.Queue() - # Queue for inter process communication - self.inter_process_queue: Queue = mp.Queue() - def init_database(self) -> None: def vacuum_db(db: SqliteExtDatabase) -> None: logger.info("Running database vacuum") @@ -350,7 +346,6 @@ class FrigateApp: name="recording_manager", args=( self.config, - self.inter_process_queue, self.object_recordings_info_queue, self.audio_recordings_info_queue, self.feature_metrics, @@ -390,9 +385,7 @@ class FrigateApp: ) def init_inter_process_communicator(self) -> None: - self.inter_process_communicator = InterProcessCommunicator( - self.inter_process_queue - ) + self.inter_process_communicator = InterProcessCommunicator() def init_web_server(self) -> None: self.flask_app = create_app( @@ -495,7 +488,6 @@ class FrigateApp: args=( self.config, self.video_output_queue, - self.inter_process_queue, self.camera_metrics, ), ) @@ -534,7 +526,6 @@ class FrigateApp: self.detection_queue, self.detection_out_events[name], self.detected_frames_queue, - self.inter_process_queue, self.camera_metrics[name], self.ptz_metrics[name], self.region_grids[name], @@ -571,7 +562,6 @@ class FrigateApp: self.audio_recordings_info_queue, self.camera_metrics, self.feature_metrics, - self.inter_process_communicator, ), ) audio_process.daemon = True @@ -777,7 +767,6 @@ class FrigateApp: self.object_recordings_info_queue, self.audio_recordings_info_queue, self.log_queue, - self.inter_process_queue, ]: if queue is not None: while not queue.empty(): diff --git a/frigate/comms/dispatcher.py b/frigate/comms/dispatcher.py index d83371c01..0af4ff37d 100644 --- a/frigate/comms/dispatcher.py +++ b/frigate/comms/dispatcher.py @@ -2,7 +2,7 @@ import logging from abc import ABC, abstractmethod -from typing import Any, Callable +from typing import Any, Callable, Optional from frigate.config import BirdseyeModeEnum, FrigateConfig from frigate.const import INSERT_MANY_RECORDINGS, INSERT_PREVIEW, REQUEST_REGION_GRID @@ -70,7 +70,7 @@ class Dispatcher: for comm in self.comms: comm.subscribe(self._receive) - def _receive(self, topic: str, payload: str) -> None: + def _receive(self, topic: str, payload: str) -> Optional[Any]: """Handle receiving of payload from communicators.""" if topic.endswith("set"): try: @@ -95,13 +95,12 @@ class Dispatcher: Recordings.insert_many(payload).execute() elif topic == REQUEST_REGION_GRID: camera = payload - self.camera_metrics[camera]["region_grid_queue"].put( - get_camera_regions_grid( - camera, - self.config.cameras[camera].detect, - max(self.config.model.width, self.config.model.height), - ) + grid = get_camera_regions_grid( + camera, + self.config.cameras[camera].detect, + max(self.config.model.width, self.config.model.height), ) + return grid elif topic == INSERT_PREVIEW: Previews.insert(payload).execute() else: diff --git a/frigate/comms/inter_process.py b/frigate/comms/inter_process.py index 74ce9bc0c..c312bf869 100644 --- a/frigate/comms/inter_process.py +++ b/frigate/comms/inter_process.py @@ -1,16 +1,25 @@ +"""Facilitates communication between processes.""" + import multiprocessing as mp -import queue +import os import threading -from multiprocessing import Queue from multiprocessing.synchronize import Event as MpEvent from typing import Callable +import zmq + from frigate.comms.dispatcher import Communicator +from frigate.const import PORT_INTER_PROCESS_COMM class InterProcessCommunicator(Communicator): - def __init__(self, queue: Queue) -> None: - self.queue = queue + def __init__(self) -> None: + INTER_PROCESS_COMM_PORT = ( + os.environ.get("INTER_PROCESS_COMM_PORT") or PORT_INTER_PROCESS_COMM + ) + self.context = zmq.Context() + self.socket = self.context.socket(zmq.REP) + self.socket.bind(f"tcp://127.0.0.1:{INTER_PROCESS_COMM_PORT}") self.stop_event: MpEvent = mp.Event() def publish(self, topic: str, payload: str, retain: bool) -> None: @@ -23,17 +32,41 @@ class InterProcessCommunicator(Communicator): self.reader_thread.start() def read(self) -> None: - while not self.stop_event.is_set(): - try: - ( - topic, - value, - ) = self.queue.get(True, 1) - except queue.Empty: - continue + while not self.stop_event.wait(0.5): + while True: # load all messages that are queued + try: + (topic, value) = self.socket.recv_pyobj(flags=zmq.NOBLOCK) - self._dispatcher(topic, value) + response = self._dispatcher(topic, value) + + if response is not None: + self.socket.send_pyobj(response) + else: + self.socket.send_pyobj([]) + except zmq.ZMQError: + break def stop(self) -> None: self.stop_event.set() self.reader_thread.join() + self.socket.close() + self.context.destroy() + + +class InterProcessRequestor: + """Simplifies sending data to InterProcessCommunicator and getting a reply.""" + + def __init__(self) -> None: + port = os.environ.get("INTER_PROCESS_COMM_PORT") or PORT_INTER_PROCESS_COMM + self.context = zmq.Context() + self.socket = self.context.socket(zmq.REQ) + self.socket.connect(f"tcp://127.0.0.1:{port}") + + def send_data(self, topic: str, data: any) -> any: + """Sends data and then waits for reply.""" + self.socket.send_pyobj((topic, data)) + return self.socket.recv_pyobj() + + def stop(self) -> None: + self.socket.close() + self.context.destroy() diff --git a/frigate/config.py b/frigate/config.py index 8d2ed7ed9..5ab51b026 100644 --- a/frigate/config.py +++ b/frigate/config.py @@ -887,6 +887,10 @@ class CameraConfig(FrigateBaseModel): global_args = get_ffmpeg_arg_list( ffmpeg_input.global_args or self.ffmpeg.global_args ) + + camera_arg = ( + self.ffmpeg.hwaccel_args if self.ffmpeg.hwaccel_args != "auto" else None + ) hwaccel_args = get_ffmpeg_arg_list( parse_preset_hardware_acceleration_decode( ffmpeg_input.hwaccel_args, @@ -896,12 +900,13 @@ class CameraConfig(FrigateBaseModel): ) or ffmpeg_input.hwaccel_args or parse_preset_hardware_acceleration_decode( - self.ffmpeg.hwaccel_args, + camera_arg, self.detect.fps, self.detect.width, self.detect.height, ) - or self.ffmpeg.hwaccel_args + or camera_arg + or [] ) input_args = get_ffmpeg_arg_list( parse_preset_input(ffmpeg_input.input_args, self.detect.fps) diff --git a/frigate/const.py b/frigate/const.py index 73f66af2f..158996116 100644 --- a/frigate/const.py +++ b/frigate/const.py @@ -57,6 +57,10 @@ DRIVER_AMD = "radeonsi" DRIVER_INTEL_i965 = "i965" DRIVER_INTEL_iHD = "iHD" +# Ports + +PORT_INTER_PROCESS_COMM = 4892 + # Record Values CACHE_SEGMENT_FORMAT = "%Y%m%d%H%M%S%z" diff --git a/frigate/events/audio.py b/frigate/events/audio.py index ed457adf1..a28b00e68 100644 --- a/frigate/events/audio.py +++ b/frigate/events/audio.py @@ -13,7 +13,7 @@ import numpy as np import requests from setproctitle import setproctitle -from frigate.comms.inter_process import InterProcessCommunicator +from frigate.comms.inter_process import InterProcessRequestor from frigate.config import CameraConfig, CameraInput, FfmpegConfig, FrigateConfig from frigate.const import ( AUDIO_DURATION, @@ -70,7 +70,6 @@ def listen_to_audio( recordings_info_queue: mp.Queue, camera_metrics: dict[str, CameraMetricsTypes], process_info: dict[str, FeatureMetricsTypes], - inter_process_communicator: InterProcessCommunicator, ) -> None: stop_event = mp.Event() audio_threads: list[threading.Thread] = [] @@ -100,7 +99,6 @@ def listen_to_audio( camera_metrics, process_info, stop_event, - inter_process_communicator, ) audio_threads.append(audio) audio.start() @@ -174,7 +172,6 @@ class AudioEventMaintainer(threading.Thread): camera_metrics: dict[str, CameraMetricsTypes], feature_metrics: dict[str, FeatureMetricsTypes], stop_event: mp.Event, - inter_process_communicator: InterProcessCommunicator, ) -> None: threading.Thread.__init__(self) self.name = f"{camera.name}_audio_event_processor" @@ -182,7 +179,6 @@ class AudioEventMaintainer(threading.Thread): self.recordings_info_queue = recordings_info_queue self.camera_metrics = camera_metrics self.feature_metrics = feature_metrics - self.inter_process_communicator = inter_process_communicator self.detections: dict[dict[str, any]] = {} self.stop_event = stop_event self.detector = AudioTfl(stop_event, self.config.audio.num_threads) @@ -193,6 +189,9 @@ class AudioEventMaintainer(threading.Thread): self.logpipe = LogPipe(f"ffmpeg.{self.config.name}.audio") self.audio_listener = None + # create communication for audio detections + self.requestor = InterProcessRequestor() + def detect_audio(self, audio) -> None: if not self.feature_metrics[self.config.name]["audio_enabled"].value: return @@ -245,12 +244,8 @@ class AudioEventMaintainer(threading.Thread): else: dBFS = 0 - self.inter_process_communicator.queue.put( - (f"{self.config.name}/audio/dBFS", float(dBFS)) - ) - self.inter_process_communicator.queue.put( - (f"{self.config.name}/audio/rms", float(rms)) - ) + self.requestor.send_data(f"{self.config.name}/audio/dBFS", float(dBFS)) + self.requestor.send_data(f"{self.config.name}/audio/rms", float(rms)) return float(rms), float(dBFS) @@ -260,9 +255,7 @@ class AudioEventMaintainer(threading.Thread): "last_detection" ] = datetime.datetime.now().timestamp() else: - self.inter_process_communicator.queue.put( - (f"{self.config.name}/audio/{label}", "ON") - ) + self.requestor.send_data(f"{self.config.name}/audio/{label}", "ON") resp = requests.post( f"{FRIGATE_LOCALHOST}/api/events/{self.config.name}/{label}/create", @@ -288,8 +281,8 @@ class AudioEventMaintainer(threading.Thread): now - detection.get("last_detection", now) > self.config.audio.max_not_heard ): - self.inter_process_communicator.queue.put( - (f"{self.config.name}/audio/{detection['label']}", "OFF") + self.requestor.send_data( + f"{self.config.name}/audio/{detection['label']}", "OFF" ) resp = requests.put( @@ -350,3 +343,4 @@ class AudioEventMaintainer(threading.Thread): stop_ffmpeg(self.audio_listener, self.logger) self.logpipe.close() + self.requestor.stop() diff --git a/frigate/ffmpeg_presets.py b/frigate/ffmpeg_presets.py index 96314e6a5..36b33d327 100644 --- a/frigate/ffmpeg_presets.py +++ b/frigate/ffmpeg_presets.py @@ -175,7 +175,7 @@ def parse_preset_hardware_acceleration_scale( if not isinstance(arg, str) or " " in arg: scale = PRESETS_HW_ACCEL_SCALE["default"] else: - scale = PRESETS_HW_ACCEL_SCALE.get(arg, "") + scale = PRESETS_HW_ACCEL_SCALE.get(arg, PRESETS_HW_ACCEL_SCALE["default"]) scale = scale.format(fps, width, height).split(" ") scale.extend(detect_args) diff --git a/frigate/output/output.py b/frigate/output/output.py index 2dd9dd082..efcff4b7c 100644 --- a/frigate/output/output.py +++ b/frigate/output/output.py @@ -30,7 +30,6 @@ logger = logging.getLogger(__name__) def output_frames( config: FrigateConfig, video_output_queue: mp.Queue, - inter_process_queue: mp.Queue, camera_metrics: dict[str, CameraMetricsTypes], ): threading.current_thread().name = "output" @@ -68,7 +67,7 @@ def output_frames( continue jsmpeg_cameras[camera] = JsmpegCamera(cam_config, stop_event, websocket_server) - preview_recorders[camera] = PreviewRecorder(cam_config, inter_process_queue) + preview_recorders[camera] = PreviewRecorder(cam_config) if config.birdseye.enabled: birdseye = Birdseye( diff --git a/frigate/output/preview.py b/frigate/output/preview.py index 784051f47..61ca248b7 100644 --- a/frigate/output/preview.py +++ b/frigate/output/preview.py @@ -2,7 +2,6 @@ import datetime import logging -import multiprocessing as mp import os import shutil import subprocess as sp @@ -12,6 +11,7 @@ from pathlib import Path import cv2 import numpy as np +from frigate.comms.inter_process import InterProcessRequestor from frigate.config import CameraConfig, RecordQualityEnum from frigate.const import CACHE_DIR, CLIPS_DIR, INSERT_PREVIEW from frigate.ffmpeg_presets import ( @@ -53,13 +53,13 @@ class FFMpegConverter(threading.Thread): self, config: CameraConfig, frame_times: list[float], - inter_process_queue: mp.Queue, + requestor: InterProcessRequestor, ): threading.Thread.__init__(self) self.name = f"{config.name}_preview_converter" self.config = config self.frame_times = frame_times - self.inter_process_queue = inter_process_queue + self.requestor = requestor self.path = os.path.join( CLIPS_DIR, f"previews/{self.config.name}/{self.frame_times[0]}-{self.frame_times[-1]}.mp4", @@ -105,18 +105,16 @@ class FFMpegConverter(threading.Thread): if p.returncode == 0: logger.debug("successfully saved preview") - self.inter_process_queue.put_nowait( - ( - INSERT_PREVIEW, - { - Previews.id: f"{self.config.name}_{end}", - Previews.camera: self.config.name, - Previews.path: self.path, - Previews.start_time: start, - Previews.end_time: end, - Previews.duration: end - start, - }, - ) + self.requestor.send_data( + INSERT_PREVIEW, + { + Previews.id: f"{self.config.name}_{end}", + Previews.camera: self.config.name, + Previews.path: self.path, + Previews.start_time: start, + Previews.end_time: end, + Previews.duration: end - start, + }, ) else: logger.error(f"Error saving preview for {self.config.name} :: {p.stderr}") @@ -128,9 +126,8 @@ class FFMpegConverter(threading.Thread): class PreviewRecorder: - def __init__(self, config: CameraConfig, inter_process_queue: mp.Queue) -> None: + def __init__(self, config: CameraConfig) -> None: self.config = config - self.inter_process_queue = inter_process_queue self.start_time = 0 self.last_output_time = 0 self.output_frames = [] @@ -139,6 +136,9 @@ class PreviewRecorder: int((config.detect.width / config.detect.height) * self.out_height) // 4 * 4 ) + # create communication for finished previews + self.requestor = InterProcessRequestor() + y, u1, u2, v1, v2 = get_yuv_crop( self.config.frame_shape_yuv, ( @@ -237,7 +237,7 @@ class PreviewRecorder: FFMpegConverter( self.config, self.output_frames, - self.inter_process_queue, + self.requestor, ).start() # reset frame cache @@ -262,3 +262,5 @@ class PreviewRecorder: shutil.rmtree(os.path.join(CACHE_DIR, FOLDER_PREVIEW_FRAMES)) except FileNotFoundError: pass + + self.requestor.stop() diff --git a/frigate/record/maintainer.py b/frigate/record/maintainer.py index 773a29a40..a67d84c16 100644 --- a/frigate/record/maintainer.py +++ b/frigate/record/maintainer.py @@ -17,6 +17,7 @@ from typing import Any, Optional, Tuple import numpy as np import psutil +from frigate.comms.inter_process import InterProcessRequestor from frigate.config import FrigateConfig, RetainModeEnum from frigate.const import ( CACHE_DIR, @@ -59,7 +60,6 @@ class RecordingMaintainer(threading.Thread): def __init__( self, config: FrigateConfig, - inter_process_queue: mp.Queue, object_recordings_info_queue: mp.Queue, audio_recordings_info_queue: Optional[mp.Queue], process_info: dict[str, FeatureMetricsTypes], @@ -68,7 +68,10 @@ class RecordingMaintainer(threading.Thread): threading.Thread.__init__(self) self.name = "recording_maintainer" self.config = config - self.inter_process_queue = inter_process_queue + + # create communication for retained recordings + self.requestor = InterProcessRequestor() + self.object_recordings_info_queue = object_recordings_info_queue self.audio_recordings_info_queue = audio_recordings_info_queue self.process_info = process_info @@ -183,8 +186,9 @@ class RecordingMaintainer(threading.Thread): recordings_to_insert: list[Optional[Recordings]] = await asyncio.gather(*tasks) # fire and forget recordings entries - self.inter_process_queue.put( - (INSERT_MANY_RECORDINGS, [r for r in recordings_to_insert if r is not None]) + self.requestor.send_data( + INSERT_MANY_RECORDINGS, + [r for r in recordings_to_insert if r is not None], ) async def validate_and_move_segment( @@ -525,4 +529,5 @@ class RecordingMaintainer(threading.Thread): duration = datetime.datetime.now().timestamp() - run_start wait_time = max(0, 5 - duration) + self.requestor.stop() logger.info("Exiting recording maintenance...") diff --git a/frigate/record/record.py b/frigate/record/record.py index ca4400e57..8fc2ed2b0 100644 --- a/frigate/record/record.py +++ b/frigate/record/record.py @@ -21,7 +21,6 @@ logger = logging.getLogger(__name__) def manage_recordings( config: FrigateConfig, - inter_process_queue: mp.Queue, object_recordings_info_queue: mp.Queue, audio_recordings_info_queue: mp.Queue, process_info: dict[str, FeatureMetricsTypes], @@ -52,7 +51,6 @@ def manage_recordings( maintainer = RecordingMaintainer( config, - inter_process_queue, object_recordings_info_queue, audio_recordings_info_queue, process_info, diff --git a/frigate/video.py b/frigate/video.py index b5fafec0b..774da4c99 100755 --- a/frigate/video.py +++ b/frigate/video.py @@ -11,6 +11,7 @@ import time import cv2 from setproctitle import setproctitle +from frigate.comms.inter_process import InterProcessRequestor from frigate.config import CameraConfig, DetectConfig, ModelConfig from frigate.const import ( ALL_ATTRIBUTE_LABELS, @@ -388,7 +389,6 @@ def track_camera( detection_queue, result_connection, detected_objects_queue, - inter_process_queue, process_info, ptz_metrics, region_grid, @@ -406,7 +406,6 @@ def track_camera( listen() frame_queue = process_info["frame_queue"] - region_grid_queue = process_info["region_grid_queue"] detection_enabled = process_info["detection_enabled"] motion_enabled = process_info["motion_enabled"] improve_contrast_enabled = process_info["improve_contrast_enabled"] @@ -433,11 +432,13 @@ def track_camera( frame_manager = SharedMemoryFrameManager() + # create communication for region grid updates + requestor = InterProcessRequestor() + process_frames( name, - inter_process_queue, + requestor, frame_queue, - region_grid_queue, frame_shape, model_config, config.detect, @@ -505,9 +506,8 @@ def detect( def process_frames( camera_name: str, - inter_process_queue: mp.Queue, + requestor: InterProcessRequestor, frame_queue: mp.Queue, - region_grid_queue: mp.Queue, frame_shape, model_config: ModelConfig, detect_config: DetectConfig, @@ -544,13 +544,7 @@ def process_frames( datetime.datetime.now().astimezone(datetime.timezone.utc) > next_region_update ): - inter_process_queue.put((REQUEST_REGION_GRID, camera_name)) - - try: - region_grid = region_grid_queue.get(True, 10) - except queue.Empty: - logger.error(f"Unable to get updated region grid for {camera_name}") - + region_grid = requestor.send_data(REQUEST_REGION_GRID, camera_name) next_region_update = get_tomorrow_at_time(2) try: @@ -826,3 +820,5 @@ def process_frames( ) detection_fps.value = object_detector.fps.eps() frame_manager.close(f"{camera_name}{frame_time}") + + requestor.stop()