Performance: multiprocessing improvement: step 2 (#6986)

* Refactored queues to use faster_fifo instead of mp.Queue

* Refactored LimitedQueue to include a counter for the number of items in the queue and updated put and get methods to use the counter

* Refactor app.py and util.py to use a custom Queue implementation called LQueue instead of the existing Queue

* Refactor put and get methods in LimitedQueue to handle queue size and blocking behavior more efficiently

* code format

* remove code from other branch (merging fuckup)
This commit is contained in:
Sergey Krashevich 2023-07-06 15:56:38 +03:00 committed by GitHub
parent d0891e5183
commit c38c981cd0
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 70 additions and 15 deletions

View File

@ -10,6 +10,7 @@ from multiprocessing.synchronize import Event as MpEvent
from types import FrameType
from typing import Optional
import faster_fifo as ff
import psutil
from faster_fifo import Queue
from peewee_migrate import Router
@ -46,6 +47,7 @@ from frigate.stats import StatsEmitter, stats_init
from frigate.storage import StorageMaintainer
from frigate.timeline import TimelineProcessor
from frigate.types import CameraMetricsTypes, FeatureMetricsTypes
from frigate.util import LimitedQueue as LQueue
from frigate.version import VERSION
from frigate.video import capture_camera, track_camera
from frigate.watchdog import FrigateWatchdog
@ -56,11 +58,11 @@ logger = logging.getLogger(__name__)
class FrigateApp:
def __init__(self) -> None:
self.stop_event: MpEvent = mp.Event()
self.detection_queue: Queue = mp.Queue()
self.detection_queue: Queue = ff.Queue()
self.detectors: dict[str, ObjectDetectProcess] = {}
self.detection_out_events: dict[str, MpEvent] = {}
self.detection_shms: list[mp.shared_memory.SharedMemory] = []
self.log_queue: Queue = mp.Queue()
self.log_queue: Queue = ff.Queue()
self.plus_api = PlusApi()
self.camera_metrics: dict[str, CameraMetricsTypes] = {}
self.feature_metrics: dict[str, FeatureMetricsTypes] = {}
@ -156,7 +158,7 @@ class FrigateApp:
"ffmpeg_pid": mp.Value("i", 0), # type: ignore[typeddict-item]
# issue https://github.com/python/typeshed/issues/8799
# from mypy 0.981 onwards
"frame_queue": mp.Queue(maxsize=2),
"frame_queue": LQueue(maxsize=2),
"capture_process": None,
"process": None,
}
@ -188,22 +190,22 @@ class FrigateApp:
def init_queues(self) -> None:
# Queues for clip processing
self.event_queue: Queue = mp.Queue()
self.event_processed_queue: Queue = mp.Queue()
self.video_output_queue: Queue = mp.Queue(
self.event_queue: Queue = ff.Queue()
self.event_processed_queue: Queue = ff.Queue()
self.video_output_queue: Queue = LQueue(
maxsize=len(self.config.cameras.keys()) * 2
)
# Queue for cameras to push tracked objects to
self.detected_frames_queue: Queue = mp.Queue(
self.detected_frames_queue: Queue = LQueue(
maxsize=len(self.config.cameras.keys()) * 2
)
# Queue for recordings info
self.recordings_info_queue: Queue = mp.Queue()
self.recordings_info_queue: Queue = ff.Queue()
# Queue for timeline events
self.timeline_queue: Queue = mp.Queue()
self.timeline_queue: Queue = ff.Queue()
def init_database(self) -> None:
def vacuum_db(db: SqliteExtDatabase) -> None:

View File

@ -7,6 +7,7 @@ import signal
import threading
from abc import ABC, abstractmethod
import faster_fifo as ff
import numpy as np
from setproctitle import setproctitle
@ -72,7 +73,7 @@ class LocalObjectDetector(ObjectDetector):
def run_detector(
name: str,
detection_queue: mp.Queue,
detection_queue: ff.Queue,
out_events: dict[str, mp.Event],
avg_speed,
start,

View File

@ -3,7 +3,6 @@
import asyncio
import datetime
import logging
import multiprocessing as mp
import os
import queue
import random
@ -15,6 +14,7 @@ from multiprocessing.synchronize import Event as MpEvent
from pathlib import Path
from typing import Any, Tuple
import faster_fifo as ff
import psutil
from frigate.config import FrigateConfig, RetainModeEnum
@ -30,7 +30,7 @@ class RecordingMaintainer(threading.Thread):
def __init__(
self,
config: FrigateConfig,
recordings_info_queue: mp.Queue,
recordings_info_queue: ff.Queue,
process_info: dict[str, FeatureMetricsTypes],
stop_event: MpEvent,
):

View File

@ -7,6 +7,7 @@ import threading
from types import FrameType
from typing import Optional
import faster_fifo as ff
from playhouse.sqliteq import SqliteQueueDatabase
from setproctitle import setproctitle
@ -22,7 +23,7 @@ logger = logging.getLogger(__name__)
def manage_recordings(
config: FrigateConfig,
recordings_info_queue: mp.Queue,
recordings_info_queue: ff.Queue,
process_info: dict[str, FeatureMetricsTypes],
) -> None:
stop_event = mp.Event()

View File

@ -1,18 +1,22 @@
import copy
import ctypes
import datetime
import json
import logging
import multiprocessing
import os
import re
import shlex
import signal
import subprocess as sp
import time
import traceback
import urllib.parse
from abc import ABC, abstractmethod
from collections import Counter
from collections.abc import Mapping
from multiprocessing import shared_memory
from queue import Empty, Full
from typing import Any, AnyStr, Optional, Tuple
import cv2
@ -21,6 +25,8 @@ import psutil
import py3nvml.py3nvml as nvml
import pytz
import yaml
from faster_fifo import DEFAULT_CIRCULAR_BUFFER_SIZE, DEFAULT_TIMEOUT
from faster_fifo import Queue as FFQueue
from frigate.const import REGEX_HTTP_CAMERA_USER_PASS, REGEX_RTSP_CAMERA_USER_PASS
@ -1218,3 +1224,47 @@ def get_video_properties(url, get_duration=False):
result["height"] = round(height)
return result
class LimitedQueue(FFQueue):
def __init__(
self,
maxsize=0,
max_size_bytes=DEFAULT_CIRCULAR_BUFFER_SIZE,
loads=None,
dumps=None,
):
super().__init__(max_size_bytes=max_size_bytes, loads=loads, dumps=dumps)
self.maxsize = maxsize
self.size = multiprocessing.RawValue(
ctypes.c_int, 0
) # Add a counter for the number of items in the queue
def put(self, x, block=True, timeout=DEFAULT_TIMEOUT):
if self.maxsize > 0 and self.size.value >= self.maxsize:
if block:
start_time = time.time()
while self.size.value >= self.maxsize:
remaining = timeout - (time.time() - start_time)
if remaining <= 0.0:
raise Full
time.sleep(min(remaining, 0.1))
else:
raise Full
self.size.value += 1
return super().put(x, block=block, timeout=timeout)
def get(self, block=True, timeout=DEFAULT_TIMEOUT):
if self.size.value <= 0 and not block:
raise Empty
self.size.value -= 1
return super().get(block=block, timeout=timeout)
def qsize(self):
return self.size
def empty(self):
return self.qsize() == 0
def full(self):
return self.qsize() == self.maxsize

View File

@ -11,6 +11,7 @@ import time
from collections import defaultdict
import cv2
import faster_fifo as ff
import numpy as np
from setproctitle import setproctitle
@ -727,7 +728,7 @@ def get_consolidated_object_detections(detected_object_groups):
def process_frames(
camera_name: str,
frame_queue: mp.Queue,
frame_queue: ff.Queue,
frame_shape,
model_config: ModelConfig,
detect_config: DetectConfig,
@ -735,7 +736,7 @@ def process_frames(
motion_detector: MotionDetector,
object_detector: RemoteObjectDetector,
object_tracker: ObjectTracker,
detected_objects_queue: mp.Queue,
detected_objects_queue: ff.Queue,
process_info: dict,
objects_to_track: list[str],
object_filters,