Use zmq for inter process communication (#9309)

* Use zmq for inter process communication

* Use localhost for reply and request

* Use pyobj instead of json and Need to use separate requestors for each audio listener

* Cleanup port defining
This commit is contained in:
Nicolas Mowen 2024-02-14 17:24:36 -07:00 committed by GitHub
parent 198dbbdff1
commit dd3dc7949a
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
13 changed files with 116 additions and 91 deletions

View File

@ -15,6 +15,7 @@ pydantic == 1.10.*
git+https://github.com/fbcotter/py3nvml#egg=py3nvml
PyYAML == 6.0.*
pytz == 2023.3.post1
pyzmq == 25.1.*
ruamel.yaml == 0.18.*
tzlocal == 5.2
types-PyYAML == 6.0.*

View File

@ -171,7 +171,6 @@ class FrigateApp:
# issue https://github.com/python/typeshed/issues/8799
# from mypy 0.981 onwards
"frame_queue": mp.Queue(maxsize=2),
"region_grid_queue": mp.Queue(maxsize=1),
"capture_process": None,
"process": None,
"audio_rms": mp.Value("d", 0.0), # type: ignore[typeddict-item]
@ -273,9 +272,6 @@ class FrigateApp:
# Queue for timeline events
self.timeline_queue: Queue = mp.Queue()
# Queue for inter process communication
self.inter_process_queue: Queue = mp.Queue()
def init_database(self) -> None:
def vacuum_db(db: SqliteExtDatabase) -> None:
logger.info("Running database vacuum")
@ -350,7 +346,6 @@ class FrigateApp:
name="recording_manager",
args=(
self.config,
self.inter_process_queue,
self.object_recordings_info_queue,
self.audio_recordings_info_queue,
self.feature_metrics,
@ -390,9 +385,7 @@ class FrigateApp:
)
def init_inter_process_communicator(self) -> None:
self.inter_process_communicator = InterProcessCommunicator(
self.inter_process_queue
)
self.inter_process_communicator = InterProcessCommunicator()
def init_web_server(self) -> None:
self.flask_app = create_app(
@ -495,7 +488,6 @@ class FrigateApp:
args=(
self.config,
self.video_output_queue,
self.inter_process_queue,
self.camera_metrics,
),
)
@ -534,7 +526,6 @@ class FrigateApp:
self.detection_queue,
self.detection_out_events[name],
self.detected_frames_queue,
self.inter_process_queue,
self.camera_metrics[name],
self.ptz_metrics[name],
self.region_grids[name],
@ -571,7 +562,6 @@ class FrigateApp:
self.audio_recordings_info_queue,
self.camera_metrics,
self.feature_metrics,
self.inter_process_communicator,
),
)
audio_process.daemon = True
@ -777,7 +767,6 @@ class FrigateApp:
self.object_recordings_info_queue,
self.audio_recordings_info_queue,
self.log_queue,
self.inter_process_queue,
]:
if queue is not None:
while not queue.empty():

View File

@ -2,7 +2,7 @@
import logging
from abc import ABC, abstractmethod
from typing import Any, Callable
from typing import Any, Callable, Optional
from frigate.config import BirdseyeModeEnum, FrigateConfig
from frigate.const import INSERT_MANY_RECORDINGS, INSERT_PREVIEW, REQUEST_REGION_GRID
@ -70,7 +70,7 @@ class Dispatcher:
for comm in self.comms:
comm.subscribe(self._receive)
def _receive(self, topic: str, payload: str) -> None:
def _receive(self, topic: str, payload: str) -> Optional[Any]:
"""Handle receiving of payload from communicators."""
if topic.endswith("set"):
try:
@ -95,13 +95,12 @@ class Dispatcher:
Recordings.insert_many(payload).execute()
elif topic == REQUEST_REGION_GRID:
camera = payload
self.camera_metrics[camera]["region_grid_queue"].put(
get_camera_regions_grid(
grid = get_camera_regions_grid(
camera,
self.config.cameras[camera].detect,
max(self.config.model.width, self.config.model.height),
)
)
return grid
elif topic == INSERT_PREVIEW:
Previews.insert(payload).execute()
else:

View File

@ -1,16 +1,25 @@
"""Facilitates communication between processes."""
import multiprocessing as mp
import queue
import os
import threading
from multiprocessing import Queue
from multiprocessing.synchronize import Event as MpEvent
from typing import Callable
import zmq
from frigate.comms.dispatcher import Communicator
from frigate.const import PORT_INTER_PROCESS_COMM
class InterProcessCommunicator(Communicator):
def __init__(self, queue: Queue) -> None:
self.queue = queue
def __init__(self) -> None:
INTER_PROCESS_COMM_PORT = (
os.environ.get("INTER_PROCESS_COMM_PORT") or PORT_INTER_PROCESS_COMM
)
self.context = zmq.Context()
self.socket = self.context.socket(zmq.REP)
self.socket.bind(f"tcp://127.0.0.1:{INTER_PROCESS_COMM_PORT}")
self.stop_event: MpEvent = mp.Event()
def publish(self, topic: str, payload: str, retain: bool) -> None:
@ -23,17 +32,41 @@ class InterProcessCommunicator(Communicator):
self.reader_thread.start()
def read(self) -> None:
while not self.stop_event.is_set():
while not self.stop_event.wait(0.5):
while True: # load all messages that are queued
try:
(
topic,
value,
) = self.queue.get(True, 1)
except queue.Empty:
continue
(topic, value) = self.socket.recv_pyobj(flags=zmq.NOBLOCK)
self._dispatcher(topic, value)
response = self._dispatcher(topic, value)
if response is not None:
self.socket.send_pyobj(response)
else:
self.socket.send_pyobj([])
except zmq.ZMQError:
break
def stop(self) -> None:
self.stop_event.set()
self.reader_thread.join()
self.socket.close()
self.context.destroy()
class InterProcessRequestor:
"""Simplifies sending data to InterProcessCommunicator and getting a reply."""
def __init__(self) -> None:
port = os.environ.get("INTER_PROCESS_COMM_PORT") or PORT_INTER_PROCESS_COMM
self.context = zmq.Context()
self.socket = self.context.socket(zmq.REQ)
self.socket.connect(f"tcp://127.0.0.1:{port}")
def send_data(self, topic: str, data: any) -> any:
"""Sends data and then waits for reply."""
self.socket.send_pyobj((topic, data))
return self.socket.recv_pyobj()
def stop(self) -> None:
self.socket.close()
self.context.destroy()

View File

@ -887,6 +887,10 @@ class CameraConfig(FrigateBaseModel):
global_args = get_ffmpeg_arg_list(
ffmpeg_input.global_args or self.ffmpeg.global_args
)
camera_arg = (
self.ffmpeg.hwaccel_args if self.ffmpeg.hwaccel_args != "auto" else None
)
hwaccel_args = get_ffmpeg_arg_list(
parse_preset_hardware_acceleration_decode(
ffmpeg_input.hwaccel_args,
@ -896,12 +900,13 @@ class CameraConfig(FrigateBaseModel):
)
or ffmpeg_input.hwaccel_args
or parse_preset_hardware_acceleration_decode(
self.ffmpeg.hwaccel_args,
camera_arg,
self.detect.fps,
self.detect.width,
self.detect.height,
)
or self.ffmpeg.hwaccel_args
or camera_arg
or []
)
input_args = get_ffmpeg_arg_list(
parse_preset_input(ffmpeg_input.input_args, self.detect.fps)

View File

@ -57,6 +57,10 @@ DRIVER_AMD = "radeonsi"
DRIVER_INTEL_i965 = "i965"
DRIVER_INTEL_iHD = "iHD"
# Ports
PORT_INTER_PROCESS_COMM = 4892
# Record Values
CACHE_SEGMENT_FORMAT = "%Y%m%d%H%M%S%z"

View File

@ -13,7 +13,7 @@ import numpy as np
import requests
from setproctitle import setproctitle
from frigate.comms.inter_process import InterProcessCommunicator
from frigate.comms.inter_process import InterProcessRequestor
from frigate.config import CameraConfig, CameraInput, FfmpegConfig, FrigateConfig
from frigate.const import (
AUDIO_DURATION,
@ -70,7 +70,6 @@ def listen_to_audio(
recordings_info_queue: mp.Queue,
camera_metrics: dict[str, CameraMetricsTypes],
process_info: dict[str, FeatureMetricsTypes],
inter_process_communicator: InterProcessCommunicator,
) -> None:
stop_event = mp.Event()
audio_threads: list[threading.Thread] = []
@ -100,7 +99,6 @@ def listen_to_audio(
camera_metrics,
process_info,
stop_event,
inter_process_communicator,
)
audio_threads.append(audio)
audio.start()
@ -174,7 +172,6 @@ class AudioEventMaintainer(threading.Thread):
camera_metrics: dict[str, CameraMetricsTypes],
feature_metrics: dict[str, FeatureMetricsTypes],
stop_event: mp.Event,
inter_process_communicator: InterProcessCommunicator,
) -> None:
threading.Thread.__init__(self)
self.name = f"{camera.name}_audio_event_processor"
@ -182,7 +179,6 @@ class AudioEventMaintainer(threading.Thread):
self.recordings_info_queue = recordings_info_queue
self.camera_metrics = camera_metrics
self.feature_metrics = feature_metrics
self.inter_process_communicator = inter_process_communicator
self.detections: dict[dict[str, any]] = {}
self.stop_event = stop_event
self.detector = AudioTfl(stop_event, self.config.audio.num_threads)
@ -193,6 +189,9 @@ class AudioEventMaintainer(threading.Thread):
self.logpipe = LogPipe(f"ffmpeg.{self.config.name}.audio")
self.audio_listener = None
# create communication for audio detections
self.requestor = InterProcessRequestor()
def detect_audio(self, audio) -> None:
if not self.feature_metrics[self.config.name]["audio_enabled"].value:
return
@ -245,12 +244,8 @@ class AudioEventMaintainer(threading.Thread):
else:
dBFS = 0
self.inter_process_communicator.queue.put(
(f"{self.config.name}/audio/dBFS", float(dBFS))
)
self.inter_process_communicator.queue.put(
(f"{self.config.name}/audio/rms", float(rms))
)
self.requestor.send_data(f"{self.config.name}/audio/dBFS", float(dBFS))
self.requestor.send_data(f"{self.config.name}/audio/rms", float(rms))
return float(rms), float(dBFS)
@ -260,9 +255,7 @@ class AudioEventMaintainer(threading.Thread):
"last_detection"
] = datetime.datetime.now().timestamp()
else:
self.inter_process_communicator.queue.put(
(f"{self.config.name}/audio/{label}", "ON")
)
self.requestor.send_data(f"{self.config.name}/audio/{label}", "ON")
resp = requests.post(
f"{FRIGATE_LOCALHOST}/api/events/{self.config.name}/{label}/create",
@ -288,8 +281,8 @@ class AudioEventMaintainer(threading.Thread):
now - detection.get("last_detection", now)
> self.config.audio.max_not_heard
):
self.inter_process_communicator.queue.put(
(f"{self.config.name}/audio/{detection['label']}", "OFF")
self.requestor.send_data(
f"{self.config.name}/audio/{detection['label']}", "OFF"
)
resp = requests.put(
@ -350,3 +343,4 @@ class AudioEventMaintainer(threading.Thread):
stop_ffmpeg(self.audio_listener, self.logger)
self.logpipe.close()
self.requestor.stop()

View File

@ -175,7 +175,7 @@ def parse_preset_hardware_acceleration_scale(
if not isinstance(arg, str) or " " in arg:
scale = PRESETS_HW_ACCEL_SCALE["default"]
else:
scale = PRESETS_HW_ACCEL_SCALE.get(arg, "")
scale = PRESETS_HW_ACCEL_SCALE.get(arg, PRESETS_HW_ACCEL_SCALE["default"])
scale = scale.format(fps, width, height).split(" ")
scale.extend(detect_args)

View File

@ -30,7 +30,6 @@ logger = logging.getLogger(__name__)
def output_frames(
config: FrigateConfig,
video_output_queue: mp.Queue,
inter_process_queue: mp.Queue,
camera_metrics: dict[str, CameraMetricsTypes],
):
threading.current_thread().name = "output"
@ -68,7 +67,7 @@ def output_frames(
continue
jsmpeg_cameras[camera] = JsmpegCamera(cam_config, stop_event, websocket_server)
preview_recorders[camera] = PreviewRecorder(cam_config, inter_process_queue)
preview_recorders[camera] = PreviewRecorder(cam_config)
if config.birdseye.enabled:
birdseye = Birdseye(

View File

@ -2,7 +2,6 @@
import datetime
import logging
import multiprocessing as mp
import os
import shutil
import subprocess as sp
@ -12,6 +11,7 @@ from pathlib import Path
import cv2
import numpy as np
from frigate.comms.inter_process import InterProcessRequestor
from frigate.config import CameraConfig, RecordQualityEnum
from frigate.const import CACHE_DIR, CLIPS_DIR, INSERT_PREVIEW
from frigate.ffmpeg_presets import (
@ -53,13 +53,13 @@ class FFMpegConverter(threading.Thread):
self,
config: CameraConfig,
frame_times: list[float],
inter_process_queue: mp.Queue,
requestor: InterProcessRequestor,
):
threading.Thread.__init__(self)
self.name = f"{config.name}_preview_converter"
self.config = config
self.frame_times = frame_times
self.inter_process_queue = inter_process_queue
self.requestor = requestor
self.path = os.path.join(
CLIPS_DIR,
f"previews/{self.config.name}/{self.frame_times[0]}-{self.frame_times[-1]}.mp4",
@ -105,8 +105,7 @@ class FFMpegConverter(threading.Thread):
if p.returncode == 0:
logger.debug("successfully saved preview")
self.inter_process_queue.put_nowait(
(
self.requestor.send_data(
INSERT_PREVIEW,
{
Previews.id: f"{self.config.name}_{end}",
@ -117,7 +116,6 @@ class FFMpegConverter(threading.Thread):
Previews.duration: end - start,
},
)
)
else:
logger.error(f"Error saving preview for {self.config.name} :: {p.stderr}")
@ -128,9 +126,8 @@ class FFMpegConverter(threading.Thread):
class PreviewRecorder:
def __init__(self, config: CameraConfig, inter_process_queue: mp.Queue) -> None:
def __init__(self, config: CameraConfig) -> None:
self.config = config
self.inter_process_queue = inter_process_queue
self.start_time = 0
self.last_output_time = 0
self.output_frames = []
@ -139,6 +136,9 @@ class PreviewRecorder:
int((config.detect.width / config.detect.height) * self.out_height) // 4 * 4
)
# create communication for finished previews
self.requestor = InterProcessRequestor()
y, u1, u2, v1, v2 = get_yuv_crop(
self.config.frame_shape_yuv,
(
@ -237,7 +237,7 @@ class PreviewRecorder:
FFMpegConverter(
self.config,
self.output_frames,
self.inter_process_queue,
self.requestor,
).start()
# reset frame cache
@ -262,3 +262,5 @@ class PreviewRecorder:
shutil.rmtree(os.path.join(CACHE_DIR, FOLDER_PREVIEW_FRAMES))
except FileNotFoundError:
pass
self.requestor.stop()

View File

@ -17,6 +17,7 @@ from typing import Any, Optional, Tuple
import numpy as np
import psutil
from frigate.comms.inter_process import InterProcessRequestor
from frigate.config import FrigateConfig, RetainModeEnum
from frigate.const import (
CACHE_DIR,
@ -59,7 +60,6 @@ class RecordingMaintainer(threading.Thread):
def __init__(
self,
config: FrigateConfig,
inter_process_queue: mp.Queue,
object_recordings_info_queue: mp.Queue,
audio_recordings_info_queue: Optional[mp.Queue],
process_info: dict[str, FeatureMetricsTypes],
@ -68,7 +68,10 @@ class RecordingMaintainer(threading.Thread):
threading.Thread.__init__(self)
self.name = "recording_maintainer"
self.config = config
self.inter_process_queue = inter_process_queue
# create communication for retained recordings
self.requestor = InterProcessRequestor()
self.object_recordings_info_queue = object_recordings_info_queue
self.audio_recordings_info_queue = audio_recordings_info_queue
self.process_info = process_info
@ -183,8 +186,9 @@ class RecordingMaintainer(threading.Thread):
recordings_to_insert: list[Optional[Recordings]] = await asyncio.gather(*tasks)
# fire and forget recordings entries
self.inter_process_queue.put(
(INSERT_MANY_RECORDINGS, [r for r in recordings_to_insert if r is not None])
self.requestor.send_data(
INSERT_MANY_RECORDINGS,
[r for r in recordings_to_insert if r is not None],
)
async def validate_and_move_segment(
@ -525,4 +529,5 @@ class RecordingMaintainer(threading.Thread):
duration = datetime.datetime.now().timestamp() - run_start
wait_time = max(0, 5 - duration)
self.requestor.stop()
logger.info("Exiting recording maintenance...")

View File

@ -21,7 +21,6 @@ logger = logging.getLogger(__name__)
def manage_recordings(
config: FrigateConfig,
inter_process_queue: mp.Queue,
object_recordings_info_queue: mp.Queue,
audio_recordings_info_queue: mp.Queue,
process_info: dict[str, FeatureMetricsTypes],
@ -52,7 +51,6 @@ def manage_recordings(
maintainer = RecordingMaintainer(
config,
inter_process_queue,
object_recordings_info_queue,
audio_recordings_info_queue,
process_info,

View File

@ -11,6 +11,7 @@ import time
import cv2
from setproctitle import setproctitle
from frigate.comms.inter_process import InterProcessRequestor
from frigate.config import CameraConfig, DetectConfig, ModelConfig
from frigate.const import (
ALL_ATTRIBUTE_LABELS,
@ -388,7 +389,6 @@ def track_camera(
detection_queue,
result_connection,
detected_objects_queue,
inter_process_queue,
process_info,
ptz_metrics,
region_grid,
@ -406,7 +406,6 @@ def track_camera(
listen()
frame_queue = process_info["frame_queue"]
region_grid_queue = process_info["region_grid_queue"]
detection_enabled = process_info["detection_enabled"]
motion_enabled = process_info["motion_enabled"]
improve_contrast_enabled = process_info["improve_contrast_enabled"]
@ -433,11 +432,13 @@ def track_camera(
frame_manager = SharedMemoryFrameManager()
# create communication for region grid updates
requestor = InterProcessRequestor()
process_frames(
name,
inter_process_queue,
requestor,
frame_queue,
region_grid_queue,
frame_shape,
model_config,
config.detect,
@ -505,9 +506,8 @@ def detect(
def process_frames(
camera_name: str,
inter_process_queue: mp.Queue,
requestor: InterProcessRequestor,
frame_queue: mp.Queue,
region_grid_queue: mp.Queue,
frame_shape,
model_config: ModelConfig,
detect_config: DetectConfig,
@ -544,13 +544,7 @@ def process_frames(
datetime.datetime.now().astimezone(datetime.timezone.utc)
> next_region_update
):
inter_process_queue.put((REQUEST_REGION_GRID, camera_name))
try:
region_grid = region_grid_queue.get(True, 10)
except queue.Empty:
logger.error(f"Unable to get updated region grid for {camera_name}")
region_grid = requestor.send_data(REQUEST_REGION_GRID, camera_name)
next_region_update = get_tomorrow_at_time(2)
try:
@ -826,3 +820,5 @@ def process_frames(
)
detection_fps.value = object_detector.fps.eps()
frame_manager.close(f"{camera_name}{frame_time}")
requestor.stop()