blakeblackshear.frigate/frigate/review/maintainer.py

433 lines
16 KiB
Python
Raw Normal View History

"""Maintain review segments in db."""
import json
import logging
import os
import random
import string
import sys
import threading
from enum import Enum
from multiprocessing.synchronize import Event as MpEvent
from typing import Optional
import cv2
import numpy as np
from frigate.comms.config_updater import ConfigSubscriber
from frigate.comms.detections_updater import DetectionSubscriber, DetectionTypeEnum
from frigate.comms.inter_process import InterProcessRequestor
from frigate.config import CameraConfig, FrigateConfig
from frigate.const import ALL_ATTRIBUTE_LABELS, CLIPS_DIR, UPSERT_REVIEW_SEGMENT
from frigate.events.external import ManualEventState
from frigate.models import ReviewSegment
from frigate.object_processing import TrackedObject
from frigate.util.image import SharedMemoryFrameManager, calculate_16_9_crop
logger = logging.getLogger(__name__)
THUMB_HEIGHT = 180
THUMB_WIDTH = 320
THRESHOLD_ALERT_ACTIVITY = 120
THRESHOLD_DETECTION_ACTIVITY = 30
THRESHOLD_MOTION_ACTIVITY = 30
class SeverityEnum(str, Enum):
alert = "alert"
detection = "detection"
signification_motion = "significant_motion"
class PendingReviewSegment:
def __init__(
self,
camera: str,
frame_time: float,
severity: SeverityEnum,
detections: dict[str, str],
zones: set[str] = set(),
audio: set[str] = set(),
motion: list[int] = [],
):
rand_id = "".join(random.choices(string.ascii_lowercase + string.digits, k=6))
self.id = f"{frame_time}-{rand_id}"
self.camera = camera
self.start_time = frame_time
self.severity = severity
self.detections = detections
self.zones = zones
self.audio = audio
self.sig_motion_areas = motion
self.last_update = frame_time
# thumbnail
self.frame = np.zeros((THUMB_HEIGHT * 3 // 2, THUMB_WIDTH), np.uint8)
self.frame_active_count = 0
self.frame_path = os.path.join(CLIPS_DIR, f"thumb-{self.camera}-{self.id}.jpg")
def update_frame(
self, camera_config: CameraConfig, frame, objects: list[TrackedObject]
):
min_x = camera_config.frame_shape[1]
min_y = camera_config.frame_shape[0]
max_x = 0
max_y = 0
# find bounds for all boxes
for o in objects:
min_x = min(o["box"][0], min_x)
min_y = min(o["box"][1], min_y)
max_x = max(o["box"][2], max_x)
max_y = max(o["box"][3], max_y)
region = calculate_16_9_crop(
camera_config.frame_shape, min_x, min_y, max_x, max_y
)
# could not find suitable 16:9 region
if not region:
return
self.frame_active_count = len(objects)
color_frame = cv2.cvtColor(frame, cv2.COLOR_YUV2BGR_I420)
color_frame = color_frame[region[1] : region[3], region[0] : region[2]]
width = int(THUMB_HEIGHT * color_frame.shape[1] / color_frame.shape[0])
self.frame = cv2.resize(
color_frame, dsize=(width, THUMB_HEIGHT), interpolation=cv2.INTER_AREA
)
if self.frame is not None:
cv2.imwrite(
self.frame_path, self.frame, [int(cv2.IMWRITE_WEBP_QUALITY), 60]
)
def get_data(self, ended: bool) -> dict:
return {
ReviewSegment.id: self.id,
ReviewSegment.camera: self.camera,
ReviewSegment.start_time: self.start_time,
ReviewSegment.end_time: self.last_update if ended else None,
ReviewSegment.severity: self.severity.value,
ReviewSegment.thumb_path: self.frame_path,
ReviewSegment.data: {
"detections": list(set(self.detections.keys())),
"objects": list(set(self.detections.values())),
"zones": list(self.zones),
"audio": list(self.audio),
"significant_motion_areas": self.sig_motion_areas,
},
}
class ReviewSegmentMaintainer(threading.Thread):
"""Maintain review segments."""
def __init__(self, config: FrigateConfig, stop_event: MpEvent):
threading.Thread.__init__(self)
self.name = "review_segment_maintainer"
self.config = config
self.active_review_segments: dict[str, Optional[PendingReviewSegment]] = {}
self.frame_manager = SharedMemoryFrameManager()
# create communication for review segments
self.requestor = InterProcessRequestor()
self.config_subscriber = ConfigSubscriber("config/record/")
self.detection_subscriber = DetectionSubscriber(DetectionTypeEnum.all)
# manual events
self.indefinite_events: dict[str, dict[str, any]] = {}
self.stop_event = stop_event
def update_segment(self, segment: PendingReviewSegment) -> None:
"""Update segment."""
seg_data = segment.get_data(ended=False)
self.requestor.send_data(UPSERT_REVIEW_SEGMENT, seg_data)
self.requestor.send_data(
"reviews",
json.dumps(
{"type": "update", "review": {k.name: v for k, v in seg_data.items()}}
),
)
def end_segment(self, segment: PendingReviewSegment) -> None:
"""End segment."""
seg_data = segment.get_data(ended=True)
self.requestor.send_data(UPSERT_REVIEW_SEGMENT, seg_data)
self.requestor.send_data(
"reviews",
json.dumps(
{"type": "end", "review": {k.name: v for k, v in seg_data.items()}}
),
)
self.active_review_segments[segment.camera] = None
def update_existing_segment(
self,
segment: PendingReviewSegment,
frame_time: float,
objects: list[TrackedObject],
motion: list,
) -> None:
"""Validate if existing review segment should continue."""
camera_config = self.config.cameras[segment.camera]
active_objects = get_active_objects(frame_time, camera_config, objects)
if len(active_objects) > 0:
if frame_time > segment.last_update:
segment.last_update = frame_time
# update type for this segment now that active objects are detected
if segment.severity == SeverityEnum.signification_motion:
segment.severity = SeverityEnum.detection
if len(active_objects) > segment.frame_active_count:
frame_id = f"{camera_config.name}{frame_time}"
yuv_frame = self.frame_manager.get(
frame_id, camera_config.frame_shape_yuv
)
segment.update_frame(camera_config, yuv_frame, active_objects)
self.frame_manager.close(frame_id)
self.update_segment(segment)
for object in active_objects:
if not object["sub_label"]:
segment.detections[object["id"]] = object["label"]
elif object["sub_label"][0] in ALL_ATTRIBUTE_LABELS:
segment.detections[object["id"]] = object["sub_label"][0]
else:
segment.detections[object["id"]] = f'{object["label"]}-verified'
# if object is alert label and has qualified for recording
# mark this review as alert
if (
segment.severity == SeverityEnum.detection
and object["has_clip"]
and object["label"] in camera_config.objects.alert
):
segment.severity = SeverityEnum.alert
# keep zones up to date
if len(object["current_zones"]) > 0:
segment.zones.update(object["current_zones"])
elif (
segment.severity == SeverityEnum.signification_motion
and len(motion) >= THRESHOLD_MOTION_ACTIVITY
):
if frame_time > segment.last_update:
segment.last_update = frame_time
else:
if segment.severity == SeverityEnum.alert and frame_time > (
segment.last_update + THRESHOLD_ALERT_ACTIVITY
):
self.end_segment(segment)
elif frame_time > (segment.last_update + THRESHOLD_DETECTION_ACTIVITY):
self.end_segment(segment)
def check_if_new_segment(
self,
camera: str,
frame_time: float,
objects: list[TrackedObject],
motion: list,
) -> None:
"""Check if a new review segment should be created."""
camera_config = self.config.cameras[camera]
active_objects = get_active_objects(frame_time, camera_config, objects)
if len(active_objects) > 0:
has_sig_object = False
detections: dict[str, str] = {}
zones: set = set()
for object in active_objects:
if (
not has_sig_object
and object["has_clip"]
and object["label"] in camera_config.objects.alert
):
has_sig_object = True
if not object["sub_label"]:
detections[object["id"]] = object["label"]
elif object["sub_label"][0] in ALL_ATTRIBUTE_LABELS:
detections[object["id"]] = object["sub_label"][0]
else:
detections[object["id"]] = f'{object["label"]}-verified'
zones.update(object["current_zones"])
self.active_review_segments[camera] = PendingReviewSegment(
camera,
frame_time,
SeverityEnum.alert if has_sig_object else SeverityEnum.detection,
detections,
audio=set(),
zones=zones,
motion=[],
)
frame_id = f"{camera_config.name}{frame_time}"
yuv_frame = self.frame_manager.get(frame_id, camera_config.frame_shape_yuv)
self.active_review_segments[camera].update_frame(
camera_config, yuv_frame, active_objects
)
self.frame_manager.close(frame_id)
self.update_segment(self.active_review_segments[camera])
elif len(motion) >= 20:
self.active_review_segments[camera] = PendingReviewSegment(
camera,
frame_time,
SeverityEnum.signification_motion,
detections={},
audio=set(),
motion=motion,
zones=set(),
)
def run(self) -> None:
while not self.stop_event.is_set():
# check if there is an updated config
while True:
(
updated_topic,
updated_record_config,
) = self.config_subscriber.check_for_update()
if not updated_topic:
break
camera_name = updated_topic.rpartition("/")[-1]
self.config.cameras[camera_name].record = updated_record_config
(topic, data) = self.detection_subscriber.get_data(timeout=1)
if not topic:
continue
if topic == DetectionTypeEnum.video:
(
camera,
frame_time,
current_tracked_objects,
motion_boxes,
regions,
) = data
elif topic == DetectionTypeEnum.audio:
(
camera,
frame_time,
dBFS,
audio_detections,
) = data
elif topic == DetectionTypeEnum.api:
(
camera,
frame_time,
manual_info,
) = data
if camera not in self.indefinite_events:
self.indefinite_events[camera] = {}
if not self.config.cameras[camera].record.enabled:
continue
current_segment = self.active_review_segments.get(camera)
if current_segment is not None:
if topic == DetectionTypeEnum.video:
self.update_existing_segment(
current_segment,
frame_time,
current_tracked_objects,
motion_boxes,
)
elif topic == DetectionTypeEnum.audio and len(audio_detections) > 0:
if frame_time > current_segment.last_update:
current_segment.last_update = frame_time
current_segment.audio.update(audio_detections)
elif topic == DetectionTypeEnum.api:
if manual_info["state"] == ManualEventState.complete:
current_segment.detections[manual_info["event_id"]] = (
manual_info["label"]
)
current_segment.severity = SeverityEnum.alert
current_segment.last_update = manual_info["end_time"]
elif manual_info["state"] == ManualEventState.start:
self.indefinite_events[camera][manual_info["event_id"]] = (
manual_info["label"]
)
current_segment.detections[manual_info["event_id"]] = (
manual_info["label"]
)
current_segment.severity = SeverityEnum.alert
# temporarily make it so this event can not end
current_segment.last_update = sys.maxsize
elif manual_info["state"] == ManualEventState.end:
self.indefinite_events[camera].pop(manual_info["event_id"])
current_segment.last_update = manual_info["end_time"]
else:
if topic == DetectionTypeEnum.video:
self.check_if_new_segment(
camera,
frame_time,
current_tracked_objects,
motion_boxes,
)
elif topic == DetectionTypeEnum.audio and len(audio_detections) > 0:
self.active_review_segments[camera] = PendingReviewSegment(
camera,
frame_time,
SeverityEnum.detection,
{},
set(),
set(audio_detections),
[],
)
elif topic == DetectionTypeEnum.api:
self.active_review_segments[camera] = PendingReviewSegment(
camera,
frame_time,
SeverityEnum.alert,
{manual_info["event_id"]: manual_info["label"]},
set(),
set(),
[],
)
if manual_info["state"] == ManualEventState.start:
self.indefinite_events[camera][manual_info["event_id"]] = (
manual_info["label"]
)
# temporarily make it so this event can not end
self.active_review_segments[camera] = sys.maxsize
elif manual_info["state"] == ManualEventState.complete:
self.active_review_segments[camera].last_update = manual_info[
"end_time"
]
self.config_subscriber.stop()
self.requestor.stop()
self.detection_subscriber.stop()
logger.info("Exiting review maintainer...")
def get_active_objects(
frame_time: float, camera_config: CameraConfig, all_objects: list[TrackedObject]
) -> list[TrackedObject]:
"""get active objects for detection."""
return [
o
for o in all_objects
if o["motionless_count"] < camera_config.detect.stationary.threshold
and o["position_changes"] > 0
and o["frame_time"] == frame_time
and not o["false_positive"]
]