diff --git a/frigate/app.py b/frigate/app.py index 4d4aa5dd4..ed32dee17 100644 --- a/frigate/app.py +++ b/frigate/app.py @@ -10,6 +10,7 @@ from multiprocessing.synchronize import Event as MpEvent from types import FrameType from typing import Optional +import faster_fifo as ff import psutil from faster_fifo import Queue from peewee_migrate import Router @@ -46,6 +47,7 @@ from frigate.stats import StatsEmitter, stats_init from frigate.storage import StorageMaintainer from frigate.timeline import TimelineProcessor from frigate.types import CameraMetricsTypes, FeatureMetricsTypes +from frigate.util import LimitedQueue as LQueue from frigate.version import VERSION from frigate.video import capture_camera, track_camera from frigate.watchdog import FrigateWatchdog @@ -56,11 +58,11 @@ logger = logging.getLogger(__name__) class FrigateApp: def __init__(self) -> None: self.stop_event: MpEvent = mp.Event() - self.detection_queue: Queue = mp.Queue() + self.detection_queue: Queue = ff.Queue() self.detectors: dict[str, ObjectDetectProcess] = {} self.detection_out_events: dict[str, MpEvent] = {} self.detection_shms: list[mp.shared_memory.SharedMemory] = [] - self.log_queue: Queue = mp.Queue() + self.log_queue: Queue = ff.Queue() self.plus_api = PlusApi() self.camera_metrics: dict[str, CameraMetricsTypes] = {} self.feature_metrics: dict[str, FeatureMetricsTypes] = {} @@ -156,7 +158,7 @@ class FrigateApp: "ffmpeg_pid": mp.Value("i", 0), # type: ignore[typeddict-item] # issue https://github.com/python/typeshed/issues/8799 # from mypy 0.981 onwards - "frame_queue": mp.Queue(maxsize=2), + "frame_queue": LQueue(maxsize=2), "capture_process": None, "process": None, } @@ -188,22 +190,22 @@ class FrigateApp: def init_queues(self) -> None: # Queues for clip processing - self.event_queue: Queue = mp.Queue() - self.event_processed_queue: Queue = mp.Queue() - self.video_output_queue: Queue = mp.Queue( + self.event_queue: Queue = ff.Queue() + self.event_processed_queue: Queue = ff.Queue() + self.video_output_queue: Queue = LQueue( maxsize=len(self.config.cameras.keys()) * 2 ) # Queue for cameras to push tracked objects to - self.detected_frames_queue: Queue = mp.Queue( + self.detected_frames_queue: Queue = LQueue( maxsize=len(self.config.cameras.keys()) * 2 ) # Queue for recordings info - self.recordings_info_queue: Queue = mp.Queue() + self.recordings_info_queue: Queue = ff.Queue() # Queue for timeline events - self.timeline_queue: Queue = mp.Queue() + self.timeline_queue: Queue = ff.Queue() def init_database(self) -> None: def vacuum_db(db: SqliteExtDatabase) -> None: diff --git a/frigate/object_detection.py b/frigate/object_detection.py index 0a2a7059c..cebd7ff41 100644 --- a/frigate/object_detection.py +++ b/frigate/object_detection.py @@ -7,6 +7,7 @@ import signal import threading from abc import ABC, abstractmethod +import faster_fifo as ff import numpy as np from setproctitle import setproctitle @@ -72,7 +73,7 @@ class LocalObjectDetector(ObjectDetector): def run_detector( name: str, - detection_queue: mp.Queue, + detection_queue: ff.Queue, out_events: dict[str, mp.Event], avg_speed, start, diff --git a/frigate/record/maintainer.py b/frigate/record/maintainer.py index 8e40fc6e7..b4208f2d2 100644 --- a/frigate/record/maintainer.py +++ b/frigate/record/maintainer.py @@ -3,7 +3,6 @@ import asyncio import datetime import logging -import multiprocessing as mp import os import queue import random @@ -15,6 +14,7 @@ from multiprocessing.synchronize import Event as MpEvent from pathlib import Path from typing import Any, Tuple +import faster_fifo as ff import psutil from frigate.config import FrigateConfig, RetainModeEnum @@ -30,7 +30,7 @@ class RecordingMaintainer(threading.Thread): def __init__( self, config: FrigateConfig, - recordings_info_queue: mp.Queue, + recordings_info_queue: ff.Queue, process_info: dict[str, FeatureMetricsTypes], stop_event: MpEvent, ): diff --git a/frigate/record/record.py b/frigate/record/record.py index 530adc031..0d22342aa 100644 --- a/frigate/record/record.py +++ b/frigate/record/record.py @@ -7,6 +7,7 @@ import threading from types import FrameType from typing import Optional +import faster_fifo as ff from playhouse.sqliteq import SqliteQueueDatabase from setproctitle import setproctitle @@ -22,7 +23,7 @@ logger = logging.getLogger(__name__) def manage_recordings( config: FrigateConfig, - recordings_info_queue: mp.Queue, + recordings_info_queue: ff.Queue, process_info: dict[str, FeatureMetricsTypes], ) -> None: stop_event = mp.Event() diff --git a/frigate/util.py b/frigate/util.py index f535a9572..cc9bb03c9 100755 --- a/frigate/util.py +++ b/frigate/util.py @@ -1,18 +1,22 @@ import copy +import ctypes import datetime import json import logging +import multiprocessing import os import re import shlex import signal import subprocess as sp +import time import traceback import urllib.parse from abc import ABC, abstractmethod from collections import Counter from collections.abc import Mapping from multiprocessing import shared_memory +from queue import Empty, Full from typing import Any, AnyStr, Optional, Tuple import cv2 @@ -21,6 +25,8 @@ import psutil import py3nvml.py3nvml as nvml import pytz import yaml +from faster_fifo import DEFAULT_CIRCULAR_BUFFER_SIZE, DEFAULT_TIMEOUT +from faster_fifo import Queue as FFQueue from frigate.const import REGEX_HTTP_CAMERA_USER_PASS, REGEX_RTSP_CAMERA_USER_PASS @@ -1218,3 +1224,47 @@ def get_video_properties(url, get_duration=False): result["height"] = round(height) return result + + +class LimitedQueue(FFQueue): + def __init__( + self, + maxsize=0, + max_size_bytes=DEFAULT_CIRCULAR_BUFFER_SIZE, + loads=None, + dumps=None, + ): + super().__init__(max_size_bytes=max_size_bytes, loads=loads, dumps=dumps) + self.maxsize = maxsize + self.size = multiprocessing.RawValue( + ctypes.c_int, 0 + ) # Add a counter for the number of items in the queue + + def put(self, x, block=True, timeout=DEFAULT_TIMEOUT): + if self.maxsize > 0 and self.size.value >= self.maxsize: + if block: + start_time = time.time() + while self.size.value >= self.maxsize: + remaining = timeout - (time.time() - start_time) + if remaining <= 0.0: + raise Full + time.sleep(min(remaining, 0.1)) + else: + raise Full + self.size.value += 1 + return super().put(x, block=block, timeout=timeout) + + def get(self, block=True, timeout=DEFAULT_TIMEOUT): + if self.size.value <= 0 and not block: + raise Empty + self.size.value -= 1 + return super().get(block=block, timeout=timeout) + + def qsize(self): + return self.size + + def empty(self): + return self.qsize() == 0 + + def full(self): + return self.qsize() == self.maxsize diff --git a/frigate/video.py b/frigate/video.py index 2ba82a577..5928a84d5 100755 --- a/frigate/video.py +++ b/frigate/video.py @@ -11,6 +11,7 @@ import time from collections import defaultdict import cv2 +import faster_fifo as ff import numpy as np from setproctitle import setproctitle @@ -727,7 +728,7 @@ def get_consolidated_object_detections(detected_object_groups): def process_frames( camera_name: str, - frame_queue: mp.Queue, + frame_queue: ff.Queue, frame_shape, model_config: ModelConfig, detect_config: DetectConfig, @@ -735,7 +736,7 @@ def process_frames( motion_detector: MotionDetector, object_detector: RemoteObjectDetector, object_tracker: ObjectTracker, - detected_objects_queue: mp.Queue, + detected_objects_queue: ff.Queue, process_info: dict, objects_to_track: list[str], object_filters,