diff --git a/frigate/__main__.py b/frigate/__main__.py index 0d9a3dbe2..b36881151 100644 --- a/frigate/__main__.py +++ b/frigate/__main__.py @@ -1,14 +1,13 @@ import faulthandler +from flask import cli faulthandler.enable() -import sys import threading threading.current_thread().name = "frigate" from frigate.app import FrigateApp -cli = sys.modules["flask.cli"] cli.show_server_banner = lambda *x: None if __name__ == "__main__": diff --git a/frigate/app.py b/frigate/app.py index be581364f..9cb315bb4 100644 --- a/frigate/app.py +++ b/frigate/app.py @@ -1,12 +1,16 @@ import json import logging import multiprocessing as mp +from multiprocessing.queues import Queue +from multiprocessing.synchronize import Event +from multiprocessing.context import Process import os import signal import sys import threading from logging.handlers import QueueHandler -from typing import Dict, List +from typing import Optional +from types import FrameType import traceback import yaml @@ -16,7 +20,7 @@ from playhouse.sqliteq import SqliteQueueDatabase from pydantic import ValidationError from frigate.config import DetectorTypeEnum, FrigateConfig -from frigate.const import CACHE_DIR, CLIPS_DIR, RECORD_DIR, PLUS_ENV_VAR, PLUS_API_HOST +from frigate.const import CACHE_DIR, CLIPS_DIR, RECORD_DIR from frigate.edgetpu import EdgeTPUProcess from frigate.events import EventCleanup, EventProcessor from frigate.http import create_app @@ -31,32 +35,27 @@ from frigate.stats import StatsEmitter, stats_init from frigate.version import VERSION from frigate.video import capture_camera, track_camera from frigate.watchdog import FrigateWatchdog +from frigate.types import CameraMetricsTypes logger = logging.getLogger(__name__) class FrigateApp: - def __init__(self): - self.stop_event = mp.Event() - self.base_config: FrigateConfig = None - self.config: FrigateConfig = None - self.detection_queue = mp.Queue() - self.detectors: Dict[str, EdgeTPUProcess] = {} - self.detection_out_events: Dict[str, mp.Event] = {} - self.detection_shms: List[mp.shared_memory.SharedMemory] = [] - self.log_queue = mp.Queue() - self.plus_api = ( - PlusApi(PLUS_API_HOST, os.environ.get(PLUS_ENV_VAR)) - if PLUS_ENV_VAR in os.environ - else None - ) - self.camera_metrics = {} + def __init__(self) -> None: + self.stop_event: Event = mp.Event() + self.detection_queue: Queue = mp.Queue() + self.detectors: dict[str, EdgeTPUProcess] = {} + self.detection_out_events: dict[str, Event] = {} + self.detection_shms: list[mp.shared_memory.SharedMemory] = [] + self.log_queue: Queue = mp.Queue() + self.plus_api = PlusApi() + self.camera_metrics: dict[str, CameraMetricsTypes] = {} - def set_environment_vars(self): + def set_environment_vars(self) -> None: for key, value in self.config.environment_vars.items(): os.environ[key] = value - def ensure_dirs(self): + def ensure_dirs(self) -> None: for d in [RECORD_DIR, CLIPS_DIR, CACHE_DIR]: if not os.path.exists(d) and not os.path.islink(d): logger.info(f"Creating directory: {d}") @@ -64,7 +63,7 @@ class FrigateApp: else: logger.debug(f"Skipping directory: {d}") - def init_logger(self): + def init_logger(self) -> None: self.log_process = mp.Process( target=log_process, args=(self.log_queue,), name="log_process" ) @@ -72,7 +71,7 @@ class FrigateApp: self.log_process.start() root_configurer(self.log_queue) - def init_config(self): + def init_config(self) -> None: config_file = os.environ.get("CONFIG_FILE", "/config/config.yml") # Check if we can use .yaml instead of .yml @@ -100,9 +99,11 @@ class FrigateApp: "read_start": mp.Value("d", 0.0), "ffmpeg_pid": mp.Value("i", 0), "frame_queue": mp.Queue(maxsize=2), + "capture_process": None, + "process": None, } - def set_log_levels(self): + def set_log_levels(self) -> None: logging.getLogger().setLevel(self.config.logger.default.value.upper()) for log, level in self.config.logger.logs.items(): logging.getLogger(log).setLevel(level.value.upper()) @@ -110,21 +111,23 @@ class FrigateApp: if not "werkzeug" in self.config.logger.logs: logging.getLogger("werkzeug").setLevel("ERROR") - def init_queues(self): + def init_queues(self) -> None: # Queues for clip processing - self.event_queue = mp.Queue() - self.event_processed_queue = mp.Queue() - self.video_output_queue = mp.Queue(maxsize=len(self.config.cameras.keys()) * 2) + self.event_queue: Queue = mp.Queue() + self.event_processed_queue: Queue = mp.Queue() + self.video_output_queue: Queue = mp.Queue( + maxsize=len(self.config.cameras.keys()) * 2 + ) # Queue for cameras to push tracked objects to - self.detected_frames_queue = mp.Queue( + self.detected_frames_queue: Queue = mp.Queue( maxsize=len(self.config.cameras.keys()) * 2 ) # Queue for recordings info - self.recordings_info_queue = mp.Queue() + self.recordings_info_queue: Queue = mp.Queue() - def init_database(self): + def init_database(self) -> None: # Migrate DB location old_db_path = os.path.join(CLIPS_DIR, "frigate.db") if not os.path.isfile(self.config.database.path) and os.path.isfile( @@ -146,10 +149,10 @@ class FrigateApp: models = [Event, Recordings] self.db.bind(models) - def init_stats(self): + def init_stats(self) -> None: self.stats_tracking = stats_init(self.camera_metrics, self.detectors) - def init_web_server(self): + def init_web_server(self) -> None: self.flask_app = create_app( self.config, self.db, @@ -158,16 +161,16 @@ class FrigateApp: self.plus_api, ) - def init_mqtt(self): + def init_mqtt(self) -> None: self.mqtt_client = create_mqtt_client(self.config, self.camera_metrics) - def start_mqtt_relay(self): + def start_mqtt_relay(self) -> None: self.mqtt_relay = MqttSocketRelay( self.mqtt_client, self.config.mqtt.topic_prefix ) self.mqtt_relay.start() - def start_detectors(self): + def start_detectors(self) -> None: model_path = self.config.model.path model_shape = (self.config.model.height, self.config.model.width) for name in self.config.cameras.keys(): @@ -214,7 +217,7 @@ class FrigateApp: detector.num_threads, ) - def start_detected_frames_processor(self): + def start_detected_frames_processor(self) -> None: self.detected_frames_processor = TrackedObjectProcessor( self.config, self.mqtt_client, @@ -228,7 +231,7 @@ class FrigateApp: ) self.detected_frames_processor.start() - def start_video_output_processor(self): + def start_video_output_processor(self) -> None: output_processor = mp.Process( target=output_frames, name=f"output_processor", @@ -242,7 +245,7 @@ class FrigateApp: output_processor.start() logger.info(f"Output process started: {output_processor.pid}") - def start_camera_processors(self): + def start_camera_processors(self) -> None: model_shape = (self.config.model.height, self.config.model.width) for name, config in self.config.cameras.items(): camera_process = mp.Process( @@ -264,7 +267,7 @@ class FrigateApp: camera_process.start() logger.info(f"Camera processor started for {name}: {camera_process.pid}") - def start_camera_capture_processes(self): + def start_camera_capture_processes(self) -> None: for name, config in self.config.cameras.items(): capture_process = mp.Process( target=capture_camera, @@ -276,7 +279,7 @@ class FrigateApp: capture_process.start() logger.info(f"Capture process started for {name}: {capture_process.pid}") - def start_event_processor(self): + def start_event_processor(self) -> None: self.event_processor = EventProcessor( self.config, self.camera_metrics, @@ -286,21 +289,21 @@ class FrigateApp: ) self.event_processor.start() - def start_event_cleanup(self): + def start_event_cleanup(self) -> None: self.event_cleanup = EventCleanup(self.config, self.stop_event) self.event_cleanup.start() - def start_recording_maintainer(self): + def start_recording_maintainer(self) -> None: self.recording_maintainer = RecordingMaintainer( self.config, self.recordings_info_queue, self.stop_event ) self.recording_maintainer.start() - def start_recording_cleanup(self): + def start_recording_cleanup(self) -> None: self.recording_cleanup = RecordingCleanup(self.config, self.stop_event) self.recording_cleanup.start() - def start_stats_emitter(self): + def start_stats_emitter(self) -> None: self.stats_emitter = StatsEmitter( self.config, self.stats_tracking, @@ -310,11 +313,11 @@ class FrigateApp: ) self.stats_emitter.start() - def start_watchdog(self): + def start_watchdog(self) -> None: self.frigate_watchdog = FrigateWatchdog(self.detectors, self.stop_event) self.frigate_watchdog.start() - def start(self): + def start(self) -> None: self.init_logger() logger.info(f"Starting Frigate ({VERSION})") try: @@ -363,7 +366,7 @@ class FrigateApp: self.start_watchdog() # self.zeroconf = broadcast_zeroconf(self.config.mqtt.client_id) - def receiveSignal(signalNumber, frame): + def receiveSignal(signalNumber: int, frame: Optional[FrameType]) -> None: self.stop() sys.exit() @@ -376,7 +379,7 @@ class FrigateApp: self.stop() - def stop(self): + def stop(self) -> None: logger.info(f"Stopping...") self.stop_event.set() diff --git a/frigate/http.py b/frigate/http.py index c3ae95d22..0b7d3deb0 100644 --- a/frigate/http.py +++ b/frigate/http.py @@ -141,7 +141,7 @@ def set_retain(id): @bp.route("/events//plus", methods=("POST",)) def send_to_plus(id): - if current_app.plus_api is None: + if current_app.plus_api.is_active(): return make_response( jsonify( { diff --git a/frigate/models.py b/frigate/models.py index 05f4f6d82..8df85b7d8 100644 --- a/frigate/models.py +++ b/frigate/models.py @@ -1,9 +1,18 @@ from numpy import unique -from peewee import * +from peewee import ( + Model, + CharField, + DateTimeField, + FloatField, + BooleanField, + JSONField, + TextField, + IntegerField, +) from playhouse.sqlite_ext import * -class Event(Model): +class Event(Model): # type: ignore[misc] id = CharField(null=False, primary_key=True, max_length=30) label = CharField(index=True, max_length=20) sub_label = CharField(max_length=20, null=True) @@ -24,7 +33,7 @@ class Event(Model): plus_id = CharField(max_length=30) -class Recordings(Model): +class Recordings(Model): # type: ignore[misc] id = CharField(null=False, primary_key=True, max_length=30) camera = CharField(index=True, max_length=20) path = CharField(unique=True) diff --git a/frigate/mypy.ini b/frigate/mypy.ini index d8c34b966..c789241d0 100644 --- a/frigate/mypy.ini +++ b/frigate/mypy.ini @@ -1,7 +1,7 @@ [mypy] python_version = 3.9 show_error_codes = true -follow_imports = silent +follow_imports = normal ignore_missing_imports = true strict_equality = true warn_incomplete_stub = true @@ -23,12 +23,32 @@ no_implicit_reexport = true [mypy-frigate.*] ignore_errors = true +[mypy-frigate.__main__] +ignore_errors = false +disallow_untyped_calls = false + +[mypy-frigate.app] +ignore_errors = false +disallow_untyped_calls = false + [mypy-frigate.const] ignore_errors = false [mypy-frigate.log] ignore_errors = false +[mypy-frigate.models] +ignore_errors = false + +[mypy-frigate.plus] +ignore_errors = false + +[mypy-frigate.stats] +ignore_errors = false + +[mypy-frigate.types] +ignore_errors = false + [mypy-frigate.version] ignore_errors = false diff --git a/frigate/plus.py b/frigate/plus.py index 5e74e7592..4e81ab393 100644 --- a/frigate/plus.py +++ b/frigate/plus.py @@ -1,12 +1,16 @@ import datetime import logging +import os import requests +from frigate.const import PLUS_ENV_VAR, PLUS_API_HOST +from requests.models import Response import cv2 +from numpy import ndarray logger = logging.getLogger(__name__) -def get_jpg_bytes(image, max_dim, quality): +def get_jpg_bytes(image: ndarray, max_dim: int, quality: int) -> bytes: if image.shape[1] >= image.shape[0]: width = min(max_dim, image.shape[1]) height = int(width * image.shape[0] / image.shape[1]) @@ -17,45 +21,54 @@ def get_jpg_bytes(image, max_dim, quality): original = cv2.resize(image, dsize=(width, height), interpolation=cv2.INTER_AREA) ret, jpg = cv2.imencode(".jpg", original, [int(cv2.IMWRITE_JPEG_QUALITY), quality]) - return jpg.tobytes() + jpg_bytes = jpg.tobytes() + return jpg_bytes if type(jpg_bytes) is bytes else b"" class PlusApi: - def __init__(self, host, key: str): - self.host = host - self.key = key - self._token_data = None + def __init__(self) -> None: + self.host = PLUS_API_HOST + if PLUS_ENV_VAR in os.environ: + self.key = os.environ.get(PLUS_ENV_VAR) + else: + self.key = None + self._is_active: bool = self.key is not None + self._token_data: dict = {} - def _refresh_token_if_needed(self): + def _refresh_token_if_needed(self) -> None: if ( - self._token_data is None + self._token_data.get("expires") is None or self._token_data["expires"] - datetime.datetime.now().timestamp() < 60 ): + if self.key is None: + raise Exception("Plus API not activated") parts = self.key.split(":") r = requests.get(f"{self.host}/v1/auth/token", auth=(parts[0], parts[1])) self._token_data = r.json() - def _get_authorization_header(self): + def _get_authorization_header(self) -> dict: self._refresh_token_if_needed() - return {"authorization": f"Bearer {self._token_data['accessToken']}"} + return {"authorization": f"Bearer {self._token_data.get('accessToken')}"} - def _get(self, path): + def _get(self, path: str) -> Response: return requests.get( f"{self.host}/v1/{path}", headers=self._get_authorization_header() ) - def _post(self, path, data): + def _post(self, path: str, data: dict) -> Response: return requests.post( f"{self.host}/v1/{path}", headers=self._get_authorization_header(), json=data, ) - def upload_image(self, image, camera: str): + def is_active(self) -> bool: + return self._is_active + + def upload_image(self, image: ndarray, camera: str) -> str: r = self._get("image/signed_urls") presigned_urls = r.json() if not r.ok: - logger.exception(ex) raise Exception("Unable to get signed urls") # resize and submit original @@ -93,4 +106,4 @@ class PlusApi: raise Exception(r.text) # return image id - return presigned_urls["imageId"] + return str(presigned_urls.get("imageId")) diff --git a/frigate/stats.py b/frigate/stats.py index ed4797d32..4bbd5d6df 100644 --- a/frigate/stats.py +++ b/frigate/stats.py @@ -6,10 +6,15 @@ import psutil import shutil import os import requests +from typing import Optional, Any +from paho.mqtt.client import Client +from multiprocessing.synchronize import Event from frigate.config import FrigateConfig from frigate.const import RECORD_DIR, CLIPS_DIR, CACHE_DIR +from frigate.types import StatsTrackingTypes, CameraMetricsTypes from frigate.version import VERSION +from frigate.edgetpu import EdgeTPUProcess logger = logging.getLogger(__name__) @@ -20,14 +25,16 @@ def get_latest_version() -> str: ) response = request.json() - if request.ok and response: - return response.get("tag_name", "unknown").replace("v", "") + if request.ok and response and "tag_name" in response: + return str(response.get("tag_name").replace("v", "")) else: return "unknown" -def stats_init(camera_metrics, detectors): - stats_tracking = { +def stats_init( + camera_metrics: dict[str, CameraMetricsTypes], detectors: dict[str, EdgeTPUProcess] +) -> StatsTrackingTypes: + stats_tracking: StatsTrackingTypes = { "camera_metrics": camera_metrics, "detectors": detectors, "started": int(time.time()), @@ -36,7 +43,7 @@ def stats_init(camera_metrics, detectors): return stats_tracking -def get_fs_type(path): +def get_fs_type(path: str) -> str: bestMatch = "" fsType = "" for part in psutil.disk_partitions(all=True): @@ -46,7 +53,7 @@ def get_fs_type(path): return fsType -def read_temperature(path): +def read_temperature(path: str) -> Optional[float]: if os.path.isfile(path): with open(path) as f: line = f.readline().strip() @@ -54,7 +61,7 @@ def read_temperature(path): return None -def get_temperatures(): +def get_temperatures() -> dict[str, float]: temps = {} # Get temperatures for all attached Corals @@ -68,29 +75,36 @@ def get_temperatures(): return temps -def stats_snapshot(stats_tracking): +def stats_snapshot(stats_tracking: StatsTrackingTypes) -> dict[str, Any]: camera_metrics = stats_tracking["camera_metrics"] - stats = {} + stats: dict[str, Any] = {} total_detection_fps = 0 for name, camera_stats in camera_metrics.items(): total_detection_fps += camera_stats["detection_fps"].value + pid = camera_stats["process"].pid if camera_stats["process"] else None + cpid = ( + camera_stats["capture_process"].pid + if camera_stats["capture_process"] + else None + ) stats[name] = { "camera_fps": round(camera_stats["camera_fps"].value, 2), "process_fps": round(camera_stats["process_fps"].value, 2), "skipped_fps": round(camera_stats["skipped_fps"].value, 2), "detection_fps": round(camera_stats["detection_fps"].value, 2), - "pid": camera_stats["process"].pid, - "capture_pid": camera_stats["capture_process"].pid, + "pid": pid, + "capture_pid": cpid, } stats["detectors"] = {} for name, detector in stats_tracking["detectors"].items(): + pid = detector.detect_process.pid if detector.detect_process else None stats["detectors"][name] = { "inference_speed": round(detector.avg_inference_speed.value * 1000, 2), "detection_start": detector.detection_start.value, - "pid": detector.detect_process.pid, + "pid": pid, } stats["detection_fps"] = round(total_detection_fps, 2) @@ -118,10 +132,10 @@ class StatsEmitter(threading.Thread): def __init__( self, config: FrigateConfig, - stats_tracking, - mqtt_client, - topic_prefix, - stop_event, + stats_tracking: StatsTrackingTypes, + mqtt_client: Client, + topic_prefix: str, + stop_event: Event, ): threading.Thread.__init__(self) self.name = "frigate_stats_emitter" @@ -131,7 +145,7 @@ class StatsEmitter(threading.Thread): self.topic_prefix = topic_prefix self.stop_event = stop_event - def run(self): + def run(self) -> None: time.sleep(10) while not self.stop_event.wait(self.config.mqtt.stats_interval): stats = stats_snapshot(self.stats_tracking) diff --git a/frigate/types.py b/frigate/types.py new file mode 100644 index 000000000..d65c601fb --- /dev/null +++ b/frigate/types.py @@ -0,0 +1,28 @@ +from typing import Optional, TypedDict +from multiprocessing.queues import Queue +from multiprocessing.sharedctypes import Synchronized +from multiprocessing.context import Process + +from frigate.edgetpu import EdgeTPUProcess + + +class CameraMetricsTypes(TypedDict): + camera_fps: Synchronized + capture_process: Optional[Process] + detection_enabled: Synchronized + detection_fps: Synchronized + detection_frame: Synchronized + ffmpeg_pid: Synchronized + frame_queue: Queue + improve_contrast_enabled: Synchronized + process: Optional[Process] + process_fps: Synchronized + read_start: Synchronized + skipped_fps: Synchronized + + +class StatsTrackingTypes(TypedDict): + camera_metrics: dict[str, CameraMetricsTypes] + detectors: dict[str, EdgeTPUProcess] + started: int + latest_frigate_version: str diff --git a/frigate/watchdog.py b/frigate/watchdog.py index da514e760..9f8b3f8d9 100644 --- a/frigate/watchdog.py +++ b/frigate/watchdog.py @@ -8,7 +8,6 @@ import signal from frigate.edgetpu import EdgeTPUProcess from frigate.util import restart_frigate from multiprocessing.synchronize import Event -from typing import dict logger = logging.getLogger(__name__) diff --git a/requirements-wheels.txt b/requirements-wheels.txt index 94786bf53..2dd2356cc 100644 --- a/requirements-wheels.txt +++ b/requirements-wheels.txt @@ -11,7 +11,9 @@ peewee_migrate == 1.4.* psutil == 5.9.* pydantic == 1.9.* PyYAML == 6.0.* +types-PyYAML == 6.0.* requests == 2.27.* +types-requests == 2.27.* scipy == 1.8.* setproctitle == 1.2.* ws4py == 0.5.*