Typing Part 3: events.py (#3352)

* Typing: events.py

* Remove unused variable

* Fix return Any from return statement

Not all elements from the event dict are sure to be something that can be evaluated

See e.g.: https://github.com/python/mypy/issues/5697

* Sort out Event disambiguity

There was a name collision of multiprocessing Event type and frigate events

Co-authored-by: Sebastian Englbrecht <sebastian.englbrecht@kabelmail.de>
This commit is contained in:
herostrat 2022-11-19 14:16:33 +01:00 committed by GitHub
parent a04fa105ef
commit 2e81c94d8e
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 40 additions and 24 deletions

View File

@ -1,7 +1,7 @@
import logging import logging
import multiprocessing as mp import multiprocessing as mp
from multiprocessing.queues import Queue from multiprocessing.queues import Queue
from multiprocessing.synchronize import Event from multiprocessing.synchronize import Event as MpEvent
import os import os
import signal import signal
import sys import sys
@ -38,10 +38,10 @@ logger = logging.getLogger(__name__)
class FrigateApp: class FrigateApp:
def __init__(self) -> None: def __init__(self) -> None:
self.stop_event: Event = mp.Event() self.stop_event: MpEvent = mp.Event()
self.detection_queue: Queue = mp.Queue() self.detection_queue: Queue = mp.Queue()
self.detectors: dict[str, ObjectDetectProcess] = {} self.detectors: dict[str, ObjectDetectProcess] = {}
self.detection_out_events: dict[str, Event] = {} self.detection_out_events: dict[str, MpEvent] = {}
self.detection_shms: list[mp.shared_memory.SharedMemory] = [] self.detection_shms: list[mp.shared_memory.SharedMemory] = []
self.log_queue: Queue = mp.Queue() self.log_queue: Queue = mp.Queue()
self.plus_api = PlusApi() self.plus_api = PlusApi()

View File

@ -11,43 +11,55 @@ from peewee import fn
from frigate.config import EventsConfig, FrigateConfig, RecordConfig from frigate.config import EventsConfig, FrigateConfig, RecordConfig
from frigate.const import CLIPS_DIR from frigate.const import CLIPS_DIR
from frigate.models import Event from frigate.models import Event
from frigate.types import CameraMetricsTypes
from multiprocessing.queues import Queue
from multiprocessing.synchronize import Event as MpEvent
from typing import Dict
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
def should_insert_db(prev_event, current_event): def should_insert_db(prev_event: Event, current_event: Event) -> bool:
"""If current event has new clip or snapshot.""" """If current event has new clip or snapshot."""
return (not prev_event["has_clip"] and not prev_event["has_snapshot"]) and ( return (not prev_event["has_clip"] and not prev_event["has_snapshot"]) and (
current_event["has_clip"] or current_event["has_snapshot"] current_event["has_clip"] or current_event["has_snapshot"]
) )
def should_update_db(prev_event, current_event): def should_update_db(prev_event: Event, current_event: Event) -> bool:
"""If current_event has updated fields and (clip or snapshot).""" """If current_event has updated fields and (clip or snapshot)."""
return (current_event["has_clip"] or current_event["has_snapshot"]) and ( if current_event["has_clip"] or current_event["has_snapshot"]:
prev_event["top_score"] != current_event["top_score"] if (
or prev_event["entered_zones"] != current_event["entered_zones"] prev_event["top_score"] != current_event["top_score"]
or prev_event["thumbnail"] != current_event["thumbnail"] or prev_event["entered_zones"] != current_event["entered_zones"]
or prev_event["has_clip"] != current_event["has_clip"] or prev_event["thumbnail"] != current_event["thumbnail"]
or prev_event["has_snapshot"] != current_event["has_snapshot"] or prev_event["has_clip"] != current_event["has_clip"]
) or prev_event["has_snapshot"] != current_event["has_snapshot"]
):
return True
return False
class EventProcessor(threading.Thread): class EventProcessor(threading.Thread):
def __init__( def __init__(
self, config, camera_processes, event_queue, event_processed_queue, stop_event self,
config: FrigateConfig,
camera_processes: dict[str, CameraMetricsTypes],
event_queue: Queue,
event_processed_queue: Queue,
stop_event: MpEvent,
): ):
threading.Thread.__init__(self) threading.Thread.__init__(self)
self.name = "event_processor" self.name = "event_processor"
self.config = config self.config = config
self.camera_processes = camera_processes self.camera_processes = camera_processes
self.cached_clips = {}
self.event_queue = event_queue self.event_queue = event_queue
self.event_processed_queue = event_processed_queue self.event_processed_queue = event_processed_queue
self.events_in_process = {} self.events_in_process: Dict[str, Event] = {}
self.stop_event = stop_event self.stop_event = stop_event
def run(self): def run(self) -> None:
# set an end_time on events without an end_time on startup # set an end_time on events without an end_time on startup
Event.update(end_time=Event.start_time + 30).where( Event.update(end_time=Event.start_time + 30).where(
Event.end_time == None Event.end_time == None
@ -147,14 +159,15 @@ class EventProcessor(threading.Thread):
class EventCleanup(threading.Thread): class EventCleanup(threading.Thread):
def __init__(self, config: FrigateConfig, stop_event): def __init__(self, config: FrigateConfig, stop_event: MpEvent):
threading.Thread.__init__(self) threading.Thread.__init__(self)
self.name = "event_cleanup" self.name = "event_cleanup"
self.config = config self.config = config
self.stop_event = stop_event self.stop_event = stop_event
self.camera_keys = list(self.config.cameras.keys()) self.camera_keys = list(self.config.cameras.keys())
def expire(self, media_type): def expire(self, media_type: str) -> None:
# TODO: Refactor media_type to enum
## Expire events from unlisted cameras based on the global config ## Expire events from unlisted cameras based on the global config
if media_type == "clips": if media_type == "clips":
retain_config = self.config.record.events.retain retain_config = self.config.record.events.retain
@ -253,7 +266,7 @@ class EventCleanup(threading.Thread):
) )
update_query.execute() update_query.execute()
def purge_duplicates(self): def purge_duplicates(self) -> None:
duplicate_query = """with grouped_events as ( duplicate_query = """with grouped_events as (
select id, select id,
label, label,
@ -287,7 +300,7 @@ class EventCleanup(threading.Thread):
.execute() .execute()
) )
def run(self): def run(self) -> None:
# only expire events every 5 minutes # only expire events every 5 minutes
while not self.stop_event.wait(300): while not self.stop_event.wait(300):
self.expire("clips") self.expire("clips")

View File

@ -34,6 +34,9 @@ disallow_untyped_calls = false
[mypy-frigate.const] [mypy-frigate.const]
ignore_errors = false ignore_errors = false
[mypy-frigate.events]
ignore_errors = false
[mypy-frigate.log] [mypy-frigate.log]
ignore_errors = false ignore_errors = false

View File

@ -8,7 +8,7 @@ import os
import requests import requests
from typing import Optional, Any from typing import Optional, Any
from paho.mqtt.client import Client from paho.mqtt.client import Client
from multiprocessing.synchronize import Event from multiprocessing.synchronize import Event as MpEvent
from frigate.config import FrigateConfig from frigate.config import FrigateConfig
from frigate.const import RECORD_DIR, CLIPS_DIR, CACHE_DIR from frigate.const import RECORD_DIR, CLIPS_DIR, CACHE_DIR
@ -148,7 +148,7 @@ class StatsEmitter(threading.Thread):
stats_tracking: StatsTrackingTypes, stats_tracking: StatsTrackingTypes,
mqtt_client: Client, mqtt_client: Client,
topic_prefix: str, topic_prefix: str,
stop_event: Event, stop_event: MpEvent,
): ):
threading.Thread.__init__(self) threading.Thread.__init__(self)
self.name = "frigate_stats_emitter" self.name = "frigate_stats_emitter"

View File

@ -7,13 +7,13 @@ import signal
from frigate.object_detection import ObjectDetectProcess from frigate.object_detection import ObjectDetectProcess
from frigate.util import restart_frigate from frigate.util import restart_frigate
from multiprocessing.synchronize import Event from multiprocessing.synchronize import Event as MpEvent
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
class FrigateWatchdog(threading.Thread): class FrigateWatchdog(threading.Thread):
def __init__(self, detectors: dict[str, ObjectDetectProcess], stop_event: Event): def __init__(self, detectors: dict[str, ObjectDetectProcess], stop_event: MpEvent):
threading.Thread.__init__(self) threading.Thread.__init__(self)
self.name = "frigate_watchdog" self.name = "frigate_watchdog"
self.detectors = detectors self.detectors = detectors