diff --git a/frigate/events.py b/frigate/events.py index 558cb2139..965651edb 100644 --- a/frigate/events.py +++ b/frigate/events.py @@ -3,6 +3,8 @@ import logging import os import queue import threading + +from enum import Enum from pathlib import Path from peewee import fn @@ -10,7 +12,6 @@ from peewee import fn from frigate.config import EventsConfig, FrigateConfig from frigate.const import CLIPS_DIR from frigate.models import Event -from frigate.timeline import TimelineSourceEnum from frigate.types import CameraMetricsTypes from frigate.util import to_relative_box @@ -21,6 +22,12 @@ from typing import Dict logger = logging.getLogger(__name__) +class EventTypeEnum(str, Enum): + # api = "api" + # audio = "audio" + tracked_object = "tracked_object" + + def should_update_db(prev_event: Event, current_event: Event) -> bool: """If current_event has updated fields and (clip or snapshot).""" if current_event["has_clip"] or current_event["has_snapshot"]: @@ -66,7 +73,9 @@ class EventProcessor(threading.Thread): while not self.stop_event.is_set(): try: - event_type, camera, event_data = self.event_queue.get(timeout=1) + source_type, event_type, camera, event_data = self.event_queue.get( + timeout=1 + ) except queue.Empty: continue @@ -75,100 +84,19 @@ class EventProcessor(threading.Thread): self.timeline_queue.put( ( camera, - TimelineSourceEnum.tracked_object, + source_type, event_type, self.events_in_process.get(event_data["id"]), event_data, ) ) - # if this is the first message, just store it and continue, its not time to insert it in the db - if event_type == "start": - self.events_in_process[event_data["id"]] = event_data - continue + if source_type == EventTypeEnum.tracked_object: + if event_type == "start": + self.events_in_process[event_data["id"]] = event_data + continue - if should_update_db(self.events_in_process[event_data["id"]], event_data): - camera_config = self.config.cameras[camera] - event_config: EventsConfig = camera_config.record.events - width = camera_config.detect.width - height = camera_config.detect.height - first_detector = list(self.config.detectors.values())[0] - - start_time = event_data["start_time"] - event_config.pre_capture - end_time = ( - None - if event_data["end_time"] is None - else event_data["end_time"] + event_config.post_capture - ) - # score of the snapshot - score = ( - None - if event_data["snapshot"] is None - else event_data["snapshot"]["score"] - ) - # detection region in the snapshot - region = ( - None - if event_data["snapshot"] is None - else to_relative_box( - width, - height, - event_data["snapshot"]["region"], - ) - ) - # bounding box for the snapshot - box = ( - None - if event_data["snapshot"] is None - else to_relative_box( - width, - height, - event_data["snapshot"]["box"], - ) - ) - - # keep these from being set back to false because the event - # may have started while recordings and snapshots were enabled - # this would be an issue for long running events - if self.events_in_process[event_data["id"]]["has_clip"]: - event_data["has_clip"] = True - if self.events_in_process[event_data["id"]]["has_snapshot"]: - event_data["has_snapshot"] = True - - event = { - Event.id: event_data["id"], - Event.label: event_data["label"], - Event.camera: camera, - Event.start_time: start_time, - Event.end_time: end_time, - Event.top_score: event_data["top_score"], - Event.score: score, - Event.zones: list(event_data["entered_zones"]), - Event.thumbnail: event_data["thumbnail"], - Event.region: region, - Event.box: box, - Event.has_clip: event_data["has_clip"], - Event.has_snapshot: event_data["has_snapshot"], - Event.model_hash: first_detector.model.model_hash, - Event.model_type: first_detector.model.model_type, - Event.detector_type: first_detector.type, - } - - ( - Event.insert(event) - .on_conflict( - conflict_target=[Event.id], - update=event, - ) - .execute() - ) - - # update the stored copy for comparison on future update messages - self.events_in_process[event_data["id"]] = event_data - - if event_type == "end": - del self.events_in_process[event_data["id"]] - self.event_processed_queue.put((event_data["id"], camera)) + self.handle_object_detection(event_type, camera, event_data) # set an end_time on events without an end_time before exiting Event.update(end_time=datetime.datetime.now().timestamp()).where( @@ -176,6 +104,99 @@ class EventProcessor(threading.Thread): ).execute() logger.info(f"Exiting event processor...") + def handle_object_detection( + self, + event_type: str, + camera: str, + event_data: Event, + ) -> None: + """handle tracked object event updates.""" + # if this is the first message, just store it and continue, its not time to insert it in the db + if should_update_db(self.events_in_process[event_data["id"]], event_data): + camera_config = self.config.cameras[camera] + event_config: EventsConfig = camera_config.record.events + width = camera_config.detect.width + height = camera_config.detect.height + first_detector = list(self.config.detectors.values())[0] + + start_time = event_data["start_time"] - event_config.pre_capture + end_time = ( + None + if event_data["end_time"] is None + else event_data["end_time"] + event_config.post_capture + ) + # score of the snapshot + score = ( + None + if event_data["snapshot"] is None + else event_data["snapshot"]["score"] + ) + # detection region in the snapshot + region = ( + None + if event_data["snapshot"] is None + else to_relative_box( + width, + height, + event_data["snapshot"]["region"], + ) + ) + # bounding box for the snapshot + box = ( + None + if event_data["snapshot"] is None + else to_relative_box( + width, + height, + event_data["snapshot"]["box"], + ) + ) + + # keep these from being set back to false because the event + # may have started while recordings and snapshots were enabled + # this would be an issue for long running events + if self.events_in_process[event_data["id"]]["has_clip"]: + event_data["has_clip"] = True + if self.events_in_process[event_data["id"]]["has_snapshot"]: + event_data["has_snapshot"] = True + + event = { + Event.id: event_data["id"], + Event.label: event_data["label"], + Event.camera: camera, + Event.start_time: start_time, + Event.end_time: end_time, + Event.zones: list(event_data["entered_zones"]), + Event.thumbnail: event_data["thumbnail"], + Event.has_clip: event_data["has_clip"], + Event.has_snapshot: event_data["has_snapshot"], + Event.model_hash: first_detector.model.model_hash, + Event.model_type: first_detector.model.model_type, + Event.detector_type: first_detector.type, + Event.data: { + "box": box, + "region": region, + "score": score, + "top_score": event_data["top_score"], + }, + } + + ( + Event.insert(event) + .on_conflict( + conflict_target=[Event.id], + update=event, + ) + .execute() + ) + + # update the stored copy for comparison on future update messages + self.events_in_process[event_data["id"]] = event_data + + if event_type == "end": + del self.events_in_process[event_data["id"]] + self.event_processed_queue.put((event_data["id"], camera)) + class EventCleanup(threading.Thread): def __init__(self, config: FrigateConfig, stop_event: MpEvent): diff --git a/frigate/http.py b/frigate/http.py index cd2a0c523..198b2be16 100644 --- a/frigate/http.py +++ b/frigate/http.py @@ -44,7 +44,6 @@ from frigate.util import ( restart_frigate, vainfo_hwaccel, get_tz_modifiers, - to_relative_box, ) from frigate.storage import StorageMaintainer from frigate.version import VERSION @@ -196,7 +195,7 @@ def send_to_plus(id): return make_response(jsonify({"success": False, "message": message}), 404) # events from before the conversion to relative dimensions cant include annotations - if any(d > 1 for d in event.box): + if any(d > 1 for d in event.data["box"]): include_annotation = None if event.end_time is None: @@ -252,8 +251,8 @@ def send_to_plus(id): event.save() if not include_annotation is None: - region = event.region - box = event.box + region = event.data["region"] + box = event.data["box"] try: current_app.plus_api.add_annotation( @@ -294,7 +293,7 @@ def false_positive(id): return make_response(jsonify({"success": False, "message": message}), 404) # events from before the conversion to relative dimensions cant include annotations - if any(d > 1 for d in event.box): + if any(d > 1 for d in event.data["box"]): message = f"Events prior to 0.13 cannot be submitted as false positives" logger.error(message) return make_response(jsonify({"success": False, "message": message}), 400) @@ -311,11 +310,15 @@ def false_positive(id): # need to refetch the event now that it has a plus_id event = Event.get(Event.id == id) - region = event.region - box = event.box + region = event.data["region"] + box = event.data["box"] # provide top score if score is unavailable - score = event.top_score if event.score is None else event.score + score = ( + (event.data["top_score"] if event.data["top_score"] else event.top_score) + if event.data["score"] is None + else event.data["score"] + ) try: current_app.plus_api.add_false_positive( @@ -756,6 +759,7 @@ def events(): Event.top_score, Event.false_positive, Event.box, + Event.data, ] if camera != "all": diff --git a/frigate/models.py b/frigate/models.py index cc1a29f31..3770121cc 100644 --- a/frigate/models.py +++ b/frigate/models.py @@ -18,22 +18,33 @@ class Event(Model): # type: ignore[misc] camera = CharField(index=True, max_length=20) start_time = DateTimeField() end_time = DateTimeField() - top_score = FloatField() - score = FloatField() + top_score = ( + FloatField() + ) # TODO remove when columns can be dropped without rebuilding table + score = ( + FloatField() + ) # TODO remove when columns can be dropped without rebuilding table false_positive = BooleanField() zones = JSONField() thumbnail = TextField() has_clip = BooleanField(default=True) has_snapshot = BooleanField(default=True) - region = JSONField() - box = JSONField() - area = IntegerField() + region = ( + JSONField() + ) # TODO remove when columns can be dropped without rebuilding table + box = ( + JSONField() + ) # TODO remove when columns can be dropped without rebuilding table + area = ( + IntegerField() + ) # TODO remove when columns can be dropped without rebuilding table retain_indefinitely = BooleanField(default=False) ratio = FloatField(default=1.0) plus_id = CharField(max_length=30) model_hash = CharField(max_length=32) detector_type = CharField(max_length=32) model_type = CharField(max_length=32) + data = JSONField() # ex: tracked object box, region, etc. class Timeline(Model): # type: ignore[misc] diff --git a/frigate/object_processing.py b/frigate/object_processing.py index db2745309..cc1667ffb 100644 --- a/frigate/object_processing.py +++ b/frigate/object_processing.py @@ -21,6 +21,7 @@ from frigate.config import ( FrigateConfig, ) from frigate.const import CLIPS_DIR +from frigate.events import EventTypeEnum from frigate.util import ( SharedMemoryFrameManager, calculate_region, @@ -656,7 +657,9 @@ class TrackedObjectProcessor(threading.Thread): self.last_motion_detected: dict[str, float] = {} def start(camera, obj: TrackedObject, current_frame_time): - self.event_queue.put(("start", camera, obj.to_dict())) + self.event_queue.put( + (EventTypeEnum.tracked_object, "start", camera, obj.to_dict()) + ) def update(camera, obj: TrackedObject, current_frame_time): obj.has_snapshot = self.should_save_snapshot(camera, obj) @@ -670,7 +673,12 @@ class TrackedObjectProcessor(threading.Thread): self.dispatcher.publish("events", json.dumps(message), retain=False) obj.previous = after self.event_queue.put( - ("update", camera, obj.to_dict(include_thumbnail=True)) + ( + EventTypeEnum.tracked_object, + "update", + camera, + obj.to_dict(include_thumbnail=True), + ) ) def end(camera, obj: TrackedObject, current_frame_time): @@ -722,7 +730,14 @@ class TrackedObjectProcessor(threading.Thread): } self.dispatcher.publish("events", json.dumps(message), retain=False) - self.event_queue.put(("end", camera, obj.to_dict(include_thumbnail=True))) + self.event_queue.put( + ( + EventTypeEnum.tracked_object, + "end", + camera, + obj.to_dict(include_thumbnail=True), + ) + ) def snapshot(camera, obj: TrackedObject, current_frame_time): mqtt_config: MqttConfig = self.config.cameras[camera].mqtt diff --git a/frigate/timeline.py b/frigate/timeline.py index c351e3e68..6b969bc0e 100644 --- a/frigate/timeline.py +++ b/frigate/timeline.py @@ -4,9 +4,8 @@ import logging import threading import queue -from enum import Enum - from frigate.config import FrigateConfig +from frigate.events import EventTypeEnum from frigate.models import Timeline from multiprocessing.queues import Queue @@ -17,12 +16,6 @@ from frigate.util import to_relative_box logger = logging.getLogger(__name__) -class TimelineSourceEnum(str, Enum): - # api = "api" - # audio = "audio" - tracked_object = "tracked_object" - - class TimelineProcessor(threading.Thread): """Handle timeline queue and update DB.""" @@ -51,7 +44,7 @@ class TimelineProcessor(threading.Thread): except queue.Empty: continue - if input_type == TimelineSourceEnum.tracked_object: + if input_type == EventTypeEnum.tracked_object: self.handle_object_detection( camera, event_type, prev_event_data, event_data ) diff --git a/migrations/015_event_refactor.py b/migrations/015_event_refactor.py new file mode 100644 index 000000000..d8a8a387c --- /dev/null +++ b/migrations/015_event_refactor.py @@ -0,0 +1,49 @@ +"""Peewee migrations + +Some examples (model - class or model name):: + + > Model = migrator.orm['model_name'] # Return model in current state by name + + > migrator.sql(sql) # Run custom SQL + > migrator.python(func, *args, **kwargs) # Run python code + > migrator.create_model(Model) # Create a model (could be used as decorator) + > migrator.remove_model(model, cascade=True) # Remove a model + > migrator.add_fields(model, **fields) # Add fields to a model + > migrator.change_fields(model, **fields) # Change fields + > migrator.remove_fields(model, *field_names, cascade=True) + > migrator.rename_field(model, old_field_name, new_field_name) + > migrator.rename_table(model, new_table_name) + > migrator.add_index(model, *col_names, unique=False) + > migrator.drop_index(model, *col_names) + > migrator.add_not_null(model, *field_names) + > migrator.drop_not_null(model, *field_names) + > migrator.add_default(model, field_name, default) + +""" + +import datetime as dt +import peewee as pw +from playhouse.sqlite_ext import * +from decimal import ROUND_HALF_EVEN +from frigate.models import Event + +try: + import playhouse.postgres_ext as pw_pext +except ImportError: + pass + +SQL = pw.SQL + + +def migrate(migrator, database, fake=False, **kwargs): + migrator.drop_not_null( + Event, "top_score", "score", "region", "box", "area", "ratio" + ) + migrator.add_fields( + Event, + data=JSONField(default={}), + ) + + +def rollback(migrator, database, fake=False, **kwargs): + pass diff --git a/web/src/components/RecordingPlaylist.jsx b/web/src/components/RecordingPlaylist.jsx index 4d6f93842..4f6996afc 100644 --- a/web/src/components/RecordingPlaylist.jsx +++ b/web/src/components/RecordingPlaylist.jsx @@ -163,7 +163,9 @@ export function EventCard({ camera, event }) {