diff --git a/frigate/app.py b/frigate/app.py index 96c2f1d27..893f8fa50 100644 --- a/frigate/app.py +++ b/frigate/app.py @@ -26,6 +26,7 @@ from frigate.const import ( CLIPS_DIR, CONFIG_DIR, DEFAULT_DB_PATH, + DEFAULT_QUEUE_BUFFER_SIZE, EXPORT_DIR, MODEL_CACHE_DIR, RECORD_DIR, @@ -47,7 +48,6 @@ from frigate.stats import StatsEmitter, stats_init from frigate.storage import StorageMaintainer from frigate.timeline import TimelineProcessor from frigate.types import CameraMetricsTypes, FeatureMetricsTypes -from frigate.util.builtin import LimitedQueue as LQueue from frigate.version import VERSION from frigate.video import capture_camera, track_camera from frigate.watchdog import FrigateWatchdog @@ -158,7 +158,7 @@ class FrigateApp: "ffmpeg_pid": mp.Value("i", 0), # type: ignore[typeddict-item] # issue https://github.com/python/typeshed/issues/8799 # from mypy 0.981 onwards - "frame_queue": LQueue(maxsize=2), + "frame_queue": mp.Queue(maxsize=2), "capture_process": None, "process": None, } @@ -190,22 +190,22 @@ class FrigateApp: def init_queues(self) -> None: # Queues for clip processing - self.event_queue: Queue = ff.Queue() - self.event_processed_queue: Queue = ff.Queue() - self.video_output_queue: Queue = LQueue( + self.event_queue: Queue = ff.Queue(DEFAULT_QUEUE_BUFFER_SIZE) + self.event_processed_queue: Queue = ff.Queue(DEFAULT_QUEUE_BUFFER_SIZE) + self.video_output_queue: Queue = mp.Queue( maxsize=len(self.config.cameras.keys()) * 2 ) # Queue for cameras to push tracked objects to - self.detected_frames_queue: Queue = LQueue( + self.detected_frames_queue: Queue = mp.Queue( maxsize=len(self.config.cameras.keys()) * 2 ) # Queue for recordings info - self.recordings_info_queue: Queue = ff.Queue() + self.recordings_info_queue: Queue = ff.Queue(DEFAULT_QUEUE_BUFFER_SIZE) # Queue for timeline events - self.timeline_queue: Queue = ff.Queue() + self.timeline_queue: Queue = ff.Queue(DEFAULT_QUEUE_BUFFER_SIZE) def init_database(self) -> None: def vacuum_db(db: SqliteExtDatabase) -> None: diff --git a/frigate/const.py b/frigate/const.py index b6b0e44bd..c508a83bf 100644 --- a/frigate/const.py +++ b/frigate/const.py @@ -46,3 +46,7 @@ DRIVER_INTEL_iHD = "iHD" MAX_SEGMENT_DURATION = 600 MAX_PLAYLIST_SECONDS = 7200 # support 2 hour segments for a single playlist to account for cameras with inconsistent segment times + +# Queue Values + +DEFAULT_QUEUE_BUFFER_SIZE = 2000 * 1000 # 2MB diff --git a/frigate/util/builtin.py b/frigate/util/builtin.py index 2f623567c..7eafc9d33 100644 --- a/frigate/util/builtin.py +++ b/frigate/util/builtin.py @@ -1,24 +1,18 @@ """Utilities for builtin types manipulation.""" import copy -import ctypes import datetime import logging -import multiprocessing import re import shlex -import time import urllib.parse from collections import Counter from collections.abc import Mapping -from queue import Empty, Full from typing import Any, Tuple import numpy as np import pytz import yaml -from faster_fifo import DEFAULT_CIRCULAR_BUFFER_SIZE, DEFAULT_TIMEOUT -from faster_fifo import Queue as FFQueue from ruamel.yaml import YAML from frigate.const import REGEX_HTTP_CAMERA_USER_PASS, REGEX_RTSP_CAMERA_USER_PASS @@ -65,54 +59,6 @@ class EventsPerSecond: del self._timestamps[0] -class LimitedQueue(FFQueue): - def __init__( - self, - maxsize=0, - max_size_bytes=DEFAULT_CIRCULAR_BUFFER_SIZE, - loads=None, - dumps=None, - ): - super().__init__(max_size_bytes=max_size_bytes, loads=loads, dumps=dumps) - self.maxsize = maxsize - self.size = multiprocessing.RawValue( - ctypes.c_int, 0 - ) # Add a counter for the number of items in the queue - self.lock = multiprocessing.Lock() # Add a lock for thread-safety - - def put(self, x, block=True, timeout=DEFAULT_TIMEOUT): - with self.lock: # Ensure thread-safety - if self.maxsize > 0 and self.size.value >= self.maxsize: - if block: - start_time = time.time() - while self.size.value >= self.maxsize: - remaining = timeout - (time.time() - start_time) - if remaining <= 0.0: - raise Full - time.sleep(min(remaining, 0.1)) - else: - raise Full - self.size.value += 1 - return super().put(x, block=block, timeout=timeout) - - def get(self, block=True, timeout=DEFAULT_TIMEOUT): - item = super().get(block=block, timeout=timeout) - with self.lock: # Ensure thread-safety - if self.size.value <= 0 and not block: - raise Empty - self.size.value -= 1 - return item - - def qsize(self): - return self.size.value - - def empty(self): - return self.qsize() == 0 - - def full(self): - return self.qsize() == self.maxsize - - def deep_merge(dct1: dict, dct2: dict, override=False, merge_lists=False) -> dict: """ :param dct1: First dict to merge