Fix queues (#7087)

* Fix queues

* Change name

* Use standard queue for limited queue

* remove unused

* isort
This commit is contained in:
Nicolas Mowen 2023-07-08 05:46:31 -06:00 committed by GitHub
parent 00b9a490bb
commit d6f82f9edc
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 12 additions and 62 deletions

View File

@ -26,6 +26,7 @@ from frigate.const import (
CLIPS_DIR, CLIPS_DIR,
CONFIG_DIR, CONFIG_DIR,
DEFAULT_DB_PATH, DEFAULT_DB_PATH,
DEFAULT_QUEUE_BUFFER_SIZE,
EXPORT_DIR, EXPORT_DIR,
MODEL_CACHE_DIR, MODEL_CACHE_DIR,
RECORD_DIR, RECORD_DIR,
@ -47,7 +48,6 @@ from frigate.stats import StatsEmitter, stats_init
from frigate.storage import StorageMaintainer from frigate.storage import StorageMaintainer
from frigate.timeline import TimelineProcessor from frigate.timeline import TimelineProcessor
from frigate.types import CameraMetricsTypes, FeatureMetricsTypes from frigate.types import CameraMetricsTypes, FeatureMetricsTypes
from frigate.util.builtin import LimitedQueue as LQueue
from frigate.version import VERSION from frigate.version import VERSION
from frigate.video import capture_camera, track_camera from frigate.video import capture_camera, track_camera
from frigate.watchdog import FrigateWatchdog from frigate.watchdog import FrigateWatchdog
@ -158,7 +158,7 @@ class FrigateApp:
"ffmpeg_pid": mp.Value("i", 0), # type: ignore[typeddict-item] "ffmpeg_pid": mp.Value("i", 0), # type: ignore[typeddict-item]
# issue https://github.com/python/typeshed/issues/8799 # issue https://github.com/python/typeshed/issues/8799
# from mypy 0.981 onwards # from mypy 0.981 onwards
"frame_queue": LQueue(maxsize=2), "frame_queue": mp.Queue(maxsize=2),
"capture_process": None, "capture_process": None,
"process": None, "process": None,
} }
@ -190,22 +190,22 @@ class FrigateApp:
def init_queues(self) -> None: def init_queues(self) -> None:
# Queues for clip processing # Queues for clip processing
self.event_queue: Queue = ff.Queue() self.event_queue: Queue = ff.Queue(DEFAULT_QUEUE_BUFFER_SIZE)
self.event_processed_queue: Queue = ff.Queue() self.event_processed_queue: Queue = ff.Queue(DEFAULT_QUEUE_BUFFER_SIZE)
self.video_output_queue: Queue = LQueue( self.video_output_queue: Queue = mp.Queue(
maxsize=len(self.config.cameras.keys()) * 2 maxsize=len(self.config.cameras.keys()) * 2
) )
# Queue for cameras to push tracked objects to # Queue for cameras to push tracked objects to
self.detected_frames_queue: Queue = LQueue( self.detected_frames_queue: Queue = mp.Queue(
maxsize=len(self.config.cameras.keys()) * 2 maxsize=len(self.config.cameras.keys()) * 2
) )
# Queue for recordings info # Queue for recordings info
self.recordings_info_queue: Queue = ff.Queue() self.recordings_info_queue: Queue = ff.Queue(DEFAULT_QUEUE_BUFFER_SIZE)
# Queue for timeline events # Queue for timeline events
self.timeline_queue: Queue = ff.Queue() self.timeline_queue: Queue = ff.Queue(DEFAULT_QUEUE_BUFFER_SIZE)
def init_database(self) -> None: def init_database(self) -> None:
def vacuum_db(db: SqliteExtDatabase) -> None: def vacuum_db(db: SqliteExtDatabase) -> None:

View File

@ -46,3 +46,7 @@ DRIVER_INTEL_iHD = "iHD"
MAX_SEGMENT_DURATION = 600 MAX_SEGMENT_DURATION = 600
MAX_PLAYLIST_SECONDS = 7200 # support 2 hour segments for a single playlist to account for cameras with inconsistent segment times MAX_PLAYLIST_SECONDS = 7200 # support 2 hour segments for a single playlist to account for cameras with inconsistent segment times
# Queue Values
DEFAULT_QUEUE_BUFFER_SIZE = 2000 * 1000 # 2MB

View File

@ -1,24 +1,18 @@
"""Utilities for builtin types manipulation.""" """Utilities for builtin types manipulation."""
import copy import copy
import ctypes
import datetime import datetime
import logging import logging
import multiprocessing
import re import re
import shlex import shlex
import time
import urllib.parse import urllib.parse
from collections import Counter from collections import Counter
from collections.abc import Mapping from collections.abc import Mapping
from queue import Empty, Full
from typing import Any, Tuple from typing import Any, Tuple
import numpy as np import numpy as np
import pytz import pytz
import yaml import yaml
from faster_fifo import DEFAULT_CIRCULAR_BUFFER_SIZE, DEFAULT_TIMEOUT
from faster_fifo import Queue as FFQueue
from ruamel.yaml import YAML from ruamel.yaml import YAML
from frigate.const import REGEX_HTTP_CAMERA_USER_PASS, REGEX_RTSP_CAMERA_USER_PASS from frigate.const import REGEX_HTTP_CAMERA_USER_PASS, REGEX_RTSP_CAMERA_USER_PASS
@ -65,54 +59,6 @@ class EventsPerSecond:
del self._timestamps[0] del self._timestamps[0]
class LimitedQueue(FFQueue):
def __init__(
self,
maxsize=0,
max_size_bytes=DEFAULT_CIRCULAR_BUFFER_SIZE,
loads=None,
dumps=None,
):
super().__init__(max_size_bytes=max_size_bytes, loads=loads, dumps=dumps)
self.maxsize = maxsize
self.size = multiprocessing.RawValue(
ctypes.c_int, 0
) # Add a counter for the number of items in the queue
self.lock = multiprocessing.Lock() # Add a lock for thread-safety
def put(self, x, block=True, timeout=DEFAULT_TIMEOUT):
with self.lock: # Ensure thread-safety
if self.maxsize > 0 and self.size.value >= self.maxsize:
if block:
start_time = time.time()
while self.size.value >= self.maxsize:
remaining = timeout - (time.time() - start_time)
if remaining <= 0.0:
raise Full
time.sleep(min(remaining, 0.1))
else:
raise Full
self.size.value += 1
return super().put(x, block=block, timeout=timeout)
def get(self, block=True, timeout=DEFAULT_TIMEOUT):
item = super().get(block=block, timeout=timeout)
with self.lock: # Ensure thread-safety
if self.size.value <= 0 and not block:
raise Empty
self.size.value -= 1
return item
def qsize(self):
return self.size.value
def empty(self):
return self.qsize() == 0
def full(self):
return self.qsize() == self.maxsize
def deep_merge(dct1: dict, dct2: dict, override=False, merge_lists=False) -> dict: def deep_merge(dct1: dict, dct2: dict, override=False, merge_lists=False) -> dict:
""" """
:param dct1: First dict to merge :param dct1: First dict to merge