diff --git a/frigate/app.py b/frigate/app.py index 81050e30b..95a742515 100644 --- a/frigate/app.py +++ b/frigate/app.py @@ -45,6 +45,7 @@ from frigate.models import ( Recordings, RecordingsToDelete, Regions, + ReviewSegment, Timeline, ) from frigate.object_detection import ObjectDetectProcess @@ -55,6 +56,7 @@ from frigate.ptz.autotrack import PtzAutoTrackerThread from frigate.ptz.onvif import OnvifController from frigate.record.cleanup import RecordingCleanup from frigate.record.record import manage_recordings +from frigate.review.review import manage_review_segments from frigate.stats import StatsEmitter, stats_init from frigate.storage import StorageMaintainer from frigate.timeline import TimelineProcessor @@ -283,6 +285,18 @@ class FrigateApp: self.processes["recording"] = recording_process.pid or 0 logger.info(f"Recording process started: {recording_process.pid}") + def init_review_segment_manager(self) -> None: + review_segment_process = mp.Process( + target=manage_review_segments, + name="review_segment_manager", + args=(self.config,), + ) + review_segment_process.daemon = True + self.review_segment_process = review_segment_process + review_segment_process.start() + self.processes["review_segment"] = review_segment_process.pid or 0 + logger.info(f"Recording process started: {review_segment_process.pid}") + def bind_database(self) -> None: """Bind db to the main process.""" # NOTE: all db accessing processes need to be created before the db can be bound to the main process @@ -297,7 +311,15 @@ class FrigateApp: 60, 10 * len([c for c in self.config.cameras.values() if c.enabled]) ), ) - models = [Event, Recordings, RecordingsToDelete, Previews, Regions, Timeline] + models = [ + Event, + Previews, + Recordings, + RecordingsToDelete, + Regions, + ReviewSegment, + Timeline, + ] self.db.bind(models) def init_stats(self) -> None: @@ -608,6 +630,7 @@ class FrigateApp: self.init_database() self.init_onvif() self.init_recording_manager() + self.init_review_segment_manager() self.init_go2rtc() self.bind_database() self.init_inter_process_communicator() diff --git a/frigate/comms/dispatcher.py b/frigate/comms/dispatcher.py index 84b84eb3c..bf551419a 100644 --- a/frigate/comms/dispatcher.py +++ b/frigate/comms/dispatcher.py @@ -6,8 +6,13 @@ from typing import Any, Callable, Optional from frigate.comms.config_updater import ConfigPublisher from frigate.config import BirdseyeModeEnum, FrigateConfig -from frigate.const import INSERT_MANY_RECORDINGS, INSERT_PREVIEW, REQUEST_REGION_GRID -from frigate.models import Previews, Recordings +from frigate.const import ( + INSERT_MANY_RECORDINGS, + INSERT_PREVIEW, + REQUEST_REGION_GRID, + UPSERT_REVIEW_SEGMENT, +) +from frigate.models import Previews, Recordings, ReviewSegment from frigate.ptz.onvif import OnvifCommandEnum, OnvifController from frigate.types import PTZMetricsTypes from frigate.util.object import get_camera_regions_grid @@ -102,6 +107,15 @@ class Dispatcher: return grid elif topic == INSERT_PREVIEW: Previews.insert(payload).execute() + elif topic == UPSERT_REVIEW_SEGMENT: + ( + ReviewSegment.insert(payload) + .on_conflict( + conflict_target=[ReviewSegment.id], + update=payload, + ) + .execute() + ) else: self.publish(topic, payload, retain=False) diff --git a/frigate/config.py b/frigate/config.py index 3e240b9c8..4264842d8 100644 --- a/frigate/config.py +++ b/frigate/config.py @@ -58,6 +58,7 @@ if os.path.isdir("/run/secrets"): ).read_text() DEFAULT_TRACKED_OBJECTS = ["person"] +DEFAULT_ALERT_OBJECTS = ["person", "car"] DEFAULT_LISTEN_AUDIO = ["bark", "fire_alarm", "scream", "speech", "yell"] DEFAULT_DETECTORS = {"cpu": {"type": "cpu"}} DEFAULT_DETECT_DIMENSIONS = {"width": 1280, "height": 720} @@ -512,6 +513,9 @@ class ZoneConfig(BaseModel): class ObjectConfig(FrigateBaseModel): track: List[str] = Field(default=DEFAULT_TRACKED_OBJECTS, title="Objects to track.") + alert: List[str] = Field( + default=DEFAULT_ALERT_OBJECTS, title="Objects to create alerts for." + ) filters: Dict[str, FilterConfig] = Field(default={}, title="Object filters.") mask: Union[str, List[str]] = Field(default="", title="Object mask.") diff --git a/frigate/const.py b/frigate/const.py index 73f66af2f..62a202c37 100644 --- a/frigate/const.py +++ b/frigate/const.py @@ -70,6 +70,7 @@ MAX_PLAYLIST_SECONDS = 7200 # support 2 hour segments for a single playlist to INSERT_MANY_RECORDINGS = "insert_many_recordings" INSERT_PREVIEW = "insert_preview" REQUEST_REGION_GRID = "request_region_grid" +UPSERT_REVIEW_SEGMENT = "upsert_review_segment" # Autotracking diff --git a/frigate/http.py b/frigate/http.py index 1ccc08ffe..179094b07 100644 --- a/frigate/http.py +++ b/frigate/http.py @@ -45,7 +45,7 @@ from frigate.const import ( RECORD_DIR, ) from frigate.events.external import ExternalEventProcessor -from frigate.models import Event, Previews, Recordings, Regions, Timeline +from frigate.models import Event, Previews, Recordings, Regions, ReviewSegment, Timeline from frigate.object_processing import TrackedObject from frigate.plus import PlusApi from frigate.ptz.onvif import OnvifController @@ -2390,6 +2390,36 @@ def vod_event(id): ) +@bp.route("/review") +def review(): + camera = request.args.get("camera", "all") + limit = request.args.get("limit", 100) + severity = request.args.get("severity", None) + + before = request.args.get("before", type=float, default=datetime.now().timestamp()) + after = request.args.get( + "after", type=float, default=(datetime.now() - timedelta(hours=18)).timestamp() + ) + + clauses = [((ReviewSegment.start_time > after) & (ReviewSegment.end_time < before))] + + if camera != "all": + clauses.append((ReviewSegment.camera == camera)) + + if severity: + clauses.append((ReviewSegment.severity == severity)) + + review = ( + ReviewSegment.select() + .where(reduce(operator.and_, clauses)) + .order_by(ReviewSegment.start_time.desc()) + .limit(limit) + .dicts() + ) + + return jsonify([r for r in review]) + + @bp.route( "/export//start//end/", methods=["POST"] ) diff --git a/frigate/models.py b/frigate/models.py index 56d429b19..87424e3a8 100644 --- a/frigate/models.py +++ b/frigate/models.py @@ -76,6 +76,17 @@ class Recordings(Model): # type: ignore[misc] segment_size = FloatField(default=0) # this should be stored as MB +class ReviewSegment(Model): # type: ignore[misc] + id = CharField(null=False, primary_key=True, max_length=30) + camera = CharField(index=True, max_length=20) + start_time = DateTimeField() + end_time = DateTimeField() + has_been_reviewed = BooleanField(default=False) + severity = CharField(max_length=30) # alert, detection, significant_motion + thumb_path = CharField(unique=True) + data = JSONField() # additional data about detection like list of labels, zone, areas of significant motion + + class Previews(Model): # type: ignore[misc] id = CharField(null=False, primary_key=True, max_length=30) camera = CharField(index=True, max_length=20) diff --git a/frigate/record/cleanup.py b/frigate/record/cleanup.py index c2c7d32e7..86f1e63e1 100644 --- a/frigate/record/cleanup.py +++ b/frigate/record/cleanup.py @@ -9,7 +9,7 @@ from pathlib import Path from frigate.config import CameraConfig, FrigateConfig, RetainModeEnum from frigate.const import CACHE_DIR, RECORD_DIR -from frigate.models import Event, Previews, Recordings +from frigate.models import Event, Previews, Recordings, ReviewSegment from frigate.record.util import remove_empty_directories, sync_recordings from frigate.util.builtin import clear_and_unlink, get_tomorrow_at_time @@ -174,6 +174,65 @@ class RecordingCleanup(threading.Thread): Previews.id << deleted_previews_list[i : i + max_deletes] ).execute() + review_segments: list[ReviewSegment] = ( + ReviewSegment.select( + ReviewSegment.id, + ReviewSegment.start_time, + ReviewSegment.end_time, + ReviewSegment.thumb_path, + ) + .where( + ReviewSegment.camera == config.name, + ReviewSegment.end_time < expire_date, + ) + .order_by(ReviewSegment.start_time) + .namedtuples() + .iterator() + ) + + # expire review segments + recording_start = 0 + deleted_segments = set() + for segment in review_segments: + keep = False + # look for a reason to keep this segment + for idx in range(recording_start, len(kept_recordings)): + start_time, end_time = kept_recordings[idx] + + # if the recording starts in the future, stop checking recordings + # and let this segment expire + if start_time > segment.end_time: + keep = False + break + + # if the recording ends after the segment starts, keep it + # and stop looking at recordings + if end_time >= segment.start_time: + keep = True + break + + # if the recording ends before this segment starts, skip + # this recording and check the next recording for an overlap. + # since the kept recordings and segments are sorted, we can skip recordings + # that end before the current segment started + if end_time < segment.start_time: + recording_start = idx + + # Delete segments without any relevant recordings + if not keep: + Path(segment.thumb_path).unlink(missing_ok=True) + deleted_segments.add(segment.id) + + # expire segments + logger.debug(f"Expiring {len(deleted_segments)} segments") + # delete up to 100,000 at a time + max_deletes = 100000 + deleted_segments_list = list(deleted_segments) + for i in range(0, len(deleted_segments_list), max_deletes): + ReviewSegment.delete().where( + ReviewSegment.id << deleted_segments_list[i : i + max_deletes] + ).execute() + def expire_recordings(self) -> None: """Delete recordings based on retention config.""" logger.debug("Start expire recordings.") diff --git a/frigate/review/__init__.py b/frigate/review/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/frigate/review/maintainer.py b/frigate/review/maintainer.py new file mode 100644 index 000000000..ad34e8ba3 --- /dev/null +++ b/frigate/review/maintainer.py @@ -0,0 +1,328 @@ +"""Maintain review segments in db.""" + +import logging +import os +import random +import string +import threading +from enum import Enum +from multiprocessing.synchronize import Event as MpEvent +from typing import Optional + +import cv2 +import numpy as np + +from frigate.comms.config_updater import ConfigSubscriber +from frigate.comms.detections_updater import DetectionSubscriber, DetectionTypeEnum +from frigate.comms.inter_process import InterProcessRequestor +from frigate.config import CameraConfig, FrigateConfig +from frigate.const import CLIPS_DIR, UPSERT_REVIEW_SEGMENT +from frigate.models import ReviewSegment +from frigate.object_processing import TrackedObject +from frigate.util.image import SharedMemoryFrameManager, calculate_16_9_crop + +logger = logging.getLogger(__name__) + + +THUMB_HEIGHT = 180 +THUMB_WIDTH = 320 + + +class SeverityEnum(str, Enum): + alert = "alert" + detection = "detection" + signification_motion = "significant_motion" + + +class PendingReviewSegment: + def __init__( + self, + camera: str, + frame_time: float, + severity: SeverityEnum, + detections: set[str] = set(), + objects: set[str] = set(), + sub_labels: set[str] = set(), + zones: set[str] = set(), + audio: set[str] = set(), + motion: list[int] = [], + ): + rand_id = "".join(random.choices(string.ascii_lowercase + string.digits, k=6)) + self.id = f"{frame_time}-{rand_id}" + self.camera = camera + self.start_time = frame_time + self.severity = severity + self.detections = detections + self.objects = objects + self.sub_labels = sub_labels + self.zones = zones + self.audio = audio + self.sig_motion_areas = motion + self.last_update = frame_time + + # thumbnail + self.frame = np.zeros((THUMB_HEIGHT * 3 // 2, THUMB_WIDTH), np.uint8) + self.frame_active_count = 0 + + def update_frame( + self, camera_config: CameraConfig, frame, objects: list[TrackedObject] + ): + min_x = camera_config.frame_shape[1] + min_y = camera_config.frame_shape[0] + max_x = 0 + max_y = 0 + + # find bounds for all boxes + for o in objects: + min_x = min(o["box"][0], min_x) + min_y = min(o["box"][1], min_y) + max_x = max(o["box"][2], max_x) + max_y = max(o["box"][3], max_y) + + region = calculate_16_9_crop( + camera_config.frame_shape, min_x, min_y, max_x, max_y + ) + + # could not find suitable 16:9 region + if not region: + return + + self.frame_active_count = len(objects) + color_frame = cv2.cvtColor(frame, cv2.COLOR_YUV2BGR_I420) + color_frame = color_frame[region[1] : region[3], region[0] : region[2]] + width = int(THUMB_HEIGHT * color_frame.shape[1] / color_frame.shape[0]) + self.frame = cv2.resize( + color_frame, dsize=(width, THUMB_HEIGHT), interpolation=cv2.INTER_AREA + ) + + def end(self) -> dict: + path = os.path.join(CLIPS_DIR, f"thumb-{self.camera}-{self.id}.jpg") + + if self.frame is not None: + cv2.imwrite(path, self.frame) + + return { + ReviewSegment.id: self.id, + ReviewSegment.camera: self.camera, + ReviewSegment.start_time: self.start_time, + ReviewSegment.end_time: self.last_update, + ReviewSegment.severity: self.severity.value, + ReviewSegment.thumb_path: path, + ReviewSegment.data: { + "detections": list(self.detections), + "objects": list(self.objects), + "sub_labels": list(self.sub_labels), + "zones": list(self.zones), + "audio": list(self.audio), + "significant_motion_areas": self.sig_motion_areas, + }, + } + + +class ReviewSegmentMaintainer(threading.Thread): + """Maintain review segments.""" + + def __init__(self, config: FrigateConfig, stop_event: MpEvent): + threading.Thread.__init__(self) + self.name = "review_segment_maintainer" + self.config = config + self.active_review_segments: dict[str, Optional[PendingReviewSegment]] = {} + self.frame_manager = SharedMemoryFrameManager() + + # create communication for review segments + self.requestor = InterProcessRequestor() + self.config_subscriber = ConfigSubscriber("config/record/") + self.detection_subscriber = DetectionSubscriber(DetectionTypeEnum.all) + + self.stop_event = stop_event + + def end_segment(self, segment: PendingReviewSegment) -> None: + """End segment.""" + self.requestor.send_data(UPSERT_REVIEW_SEGMENT, segment.end()) + self.active_review_segments[segment.camera] = None + + def update_existing_segment( + self, + segment: PendingReviewSegment, + frame_time: float, + objects: list[TrackedObject], + motion: list, + ) -> None: + """Validate if existing review segment should continue.""" + camera_config = self.config.cameras[segment.camera] + active_objects = get_active_objects(frame_time, camera_config, objects) + + if len(active_objects) > 0: + segment.last_update = frame_time + + # update type for this segment now that active objects are detected + if segment.severity == SeverityEnum.signification_motion: + segment.severity = SeverityEnum.detection + + if len(active_objects) > segment.frame_active_count: + frame_id = f"{camera_config.name}{frame_time}" + yuv_frame = self.frame_manager.get( + frame_id, camera_config.frame_shape_yuv + ) + segment.update_frame(camera_config, yuv_frame, active_objects) + self.frame_manager.close(frame_id) + + for object in active_objects: + segment.detections.add(object["id"]) + segment.objects.add(object["label"]) + + if object["sub_label"]: + segment.sub_labels.add(object["sub_label"]) + + # if object is alert label and has qualified for recording + # mark this review as alert + if ( + segment.severity == SeverityEnum.detection + and object["has_clip"] + and object["label"] in camera_config.objects.alert + ): + segment.severity = SeverityEnum.alert + + # keep zones up to date + if len(object["current_zones"]) > 0: + segment.zones.update(object["current_zones"]) + elif ( + segment.severity == SeverityEnum.signification_motion and len(motion) >= 20 + ): + segment.last_update = frame_time + else: + if segment.severity == SeverityEnum.alert and frame_time > ( + segment.last_update + 60 + ): + self.end_segment(segment) + elif frame_time > (segment.last_update + 10): + self.end_segment(segment) + + def check_if_new_segment( + self, + camera: str, + frame_time: float, + objects: list[TrackedObject], + motion: list, + ) -> None: + """Check if a new review segment should be created.""" + camera_config = self.config.cameras[camera] + active_objects = get_active_objects(frame_time, camera_config, objects) + + if len(active_objects) > 0: + has_sig_object = False + detections: set = set() + objects: set = set() + sub_labels: set = (set(),) + zones: set = set() + + for object in active_objects: + if ( + not has_sig_object + and object["has_clip"] + and object["label"] in camera_config.objects.alert + ): + has_sig_object = True + + detections.add(object["id"]) + objects.add(object["label"]) + + if object["sub_label"]: + sub_labels.add(object["sub_label"]) + + zones.update(object["current_zones"]) + + self.active_review_segments[camera] = PendingReviewSegment( + camera, + frame_time, + SeverityEnum.alert if has_sig_object else SeverityEnum.detection, + detections, + objects=objects, + sub_labels=sub_labels, + zones=zones, + ) + elif len(motion) >= 20: + self.active_review_segments[camera] = PendingReviewSegment( + camera, frame_time, SeverityEnum.signification_motion, motion=motion + ) + + def run(self) -> None: + while not self.stop_event.is_set(): + # check if there is an updated config + while True: + ( + updated_topic, + updated_record_config, + ) = self.config_subscriber.check_for_update() + + if not updated_topic: + break + + camera_name = updated_topic.rpartition("/")[-1] + self.config.cameras[camera_name].record = updated_record_config + + (topic, data) = self.detection_subscriber.get_data(timeout=1) + + if not topic: + continue + + if topic == DetectionTypeEnum.video: + ( + camera, + frame_time, + current_tracked_objects, + motion_boxes, + regions, + ) = data + elif topic == DetectionTypeEnum.audio: + ( + camera, + frame_time, + dBFS, + audio_detections, + ) = data + + if not self.config.cameras[camera].record.enabled: + continue + + current_segment = self.active_review_segments.get(camera) + + if current_segment is not None: + if topic == DetectionTypeEnum.video: + self.update_existing_segment( + current_segment, + frame_time, + current_tracked_objects, + motion_boxes, + ) + elif topic == DetectionTypeEnum.audio and len(audio_detections) > 0: + current_segment.last_update = frame_time + current_segment.audio.update(audio_detections) + else: + if topic == DetectionTypeEnum.video: + self.check_if_new_segment( + camera, + frame_time, + current_tracked_objects, + motion_boxes, + ) + elif topic == DetectionTypeEnum.audio and len(audio_detections) > 0: + self.active_review_segments[camera] = PendingReviewSegment( + camera, + frame_time, + SeverityEnum.detection, + audio=set(audio_detections), + ) + + +def get_active_objects( + frame_time: float, camera_config: CameraConfig, all_objects: list[TrackedObject] +) -> list[TrackedObject]: + """get active objects for detection.""" + return [ + o + for o in all_objects + if o["motionless_count"] < camera_config.detect.stationary.threshold + and o["frame_time"] == frame_time + and not o["false_positive"] + ] diff --git a/frigate/review/review.py b/frigate/review/review.py new file mode 100644 index 000000000..dafa6c802 --- /dev/null +++ b/frigate/review/review.py @@ -0,0 +1,36 @@ +"""Run recording maintainer and cleanup.""" + +import logging +import multiprocessing as mp +import signal +import threading +from types import FrameType +from typing import Optional + +from setproctitle import setproctitle + +from frigate.config import FrigateConfig +from frigate.review.maintainer import ReviewSegmentMaintainer +from frigate.util.services import listen + +logger = logging.getLogger(__name__) + + +def manage_review_segments(config: FrigateConfig) -> None: + stop_event = mp.Event() + + def receiveSignal(signalNumber: int, frame: Optional[FrameType]) -> None: + stop_event.set() + + signal.signal(signal.SIGTERM, receiveSignal) + signal.signal(signal.SIGINT, receiveSignal) + + threading.current_thread().name = "process:review_segment_manager" + setproctitle("frigate.review_segment_manager") + listen() + + maintainer = ReviewSegmentMaintainer( + config, + stop_event, + ) + maintainer.start() diff --git a/frigate/util/image.py b/frigate/util/image.py index c9da2ae3b..0afa451b7 100644 --- a/frigate/util/image.py +++ b/frigate/util/image.py @@ -211,6 +211,51 @@ def calculate_region(frame_shape, xmin, ymin, xmax, ymax, model_size, multiplier return (x_offset, y_offset, x_offset + size, y_offset + size) +def calculate_16_9_crop(frame_shape, xmin, ymin, xmax, ymax, multiplier=1.25): + min_size = 200 + + # size is the longest edge and divisible by 4 + x_size = int(xmax - xmin * multiplier) + + if x_size < min_size: + x_size = min_size + + y_size = int(ymax - ymin * multiplier) + + if y_size < min_size: + y_size = min_size + + # calculate 16x9 using height + aspect_y_size = int(9 / 16 * x_size) + + # if 16:9 by height is too small + if aspect_y_size < y_size or aspect_y_size > frame_shape[0]: + x_size = int((16 / 9) * y_size) // 4 * 4 + + if x_size / y_size > 1.8: + return None + else: + y_size = aspect_y_size // 4 * 4 + + # x_offset is midpoint of bounding box minus half the size + x_offset = int((xmax - xmin) / 2.0 + xmin - x_size / 2.0) + # if outside the image + if x_offset < 0: + x_offset = 0 + elif x_offset > (frame_shape[1] - x_size): + x_offset = max(0, (frame_shape[1] - x_size)) + + # y_offset is midpoint of bounding box minus half the size + y_offset = int((ymax - ymin) / 2.0 + ymin - y_size / 2.0) + # # if outside the image + if y_offset < 0: + y_offset = 0 + elif y_offset > (frame_shape[0] - y_size): + y_offset = max(0, (frame_shape[0] - y_size)) + + return (x_offset, y_offset, x_offset + x_size, y_offset + y_size) + + def get_yuv_crop(frame_shape, crop): # crop should be (x1,y1,x2,y2) frame_height = frame_shape[0] // 3 * 2 diff --git a/migrations/022_create_review_segment_table.py b/migrations/022_create_review_segment_table.py new file mode 100644 index 000000000..681795e37 --- /dev/null +++ b/migrations/022_create_review_segment_table.py @@ -0,0 +1,42 @@ +"""Peewee migrations -- 022_create_review_segment_table.py. + +Some examples (model - class or model name):: + + > Model = migrator.orm['model_name'] # Return model in current state by name + + > migrator.sql(sql) # Run custom SQL + > migrator.python(func, *args, **kwargs) # Run python code + > migrator.create_model(Model) # Create a model (could be used as decorator) + > migrator.remove_model(model, cascade=True) # Remove a model + > migrator.add_fields(model, **fields) # Add fields to a model + > migrator.change_fields(model, **fields) # Change fields + > migrator.remove_fields(model, *field_names, cascade=True) + > migrator.rename_field(model, old_field_name, new_field_name) + > migrator.rename_table(model, new_table_name) + > migrator.add_index(model, *col_names, unique=False) + > migrator.drop_index(model, *col_names) + > migrator.add_not_null(model, *field_names) + > migrator.drop_not_null(model, *field_names) + > migrator.add_default(model, field_name, default) + +""" + +import peewee as pw + +SQL = pw.SQL + + +def migrate(migrator, database, fake=False, **kwargs): + migrator.sql( + 'CREATE TABLE IF NOT EXISTS "reviewsegment" ("id" VARCHAR(30) NOT NULL PRIMARY KEY, "camera" VARCHAR(20) NOT NULL, "start_time" DATETIME NOT NULL, "end_time" DATETIME, "has_been_reviewed" INTEGER NOT NULL, "severity" VARCHAR(30) NOT NULL, "thumb_path" VARCHAR(255) NOT NULL, "data" JSON NOT NULL)' + ) + migrator.sql( + 'CREATE INDEX IF NOT EXISTS "review_segment_camera" ON "reviewsegment" ("camera")' + ) + migrator.sql( + 'CREATE INDEX "review_segment_start_time_end_time" ON "reviewsegment" ("start_time" DESC, "end_time" DESC)' + ) + + +def rollback(migrator, database, fake=False, **kwargs): + pass