"""Handle external events created by the user.""" import base64 import datetime import logging import os import random import string from enum import Enum from typing import Optional import cv2 from frigate.comms.detections_updater import DetectionPublisher, DetectionTypeEnum from frigate.comms.events_updater import EventUpdatePublisher from frigate.config import CameraConfig, FrigateConfig from frigate.const import CLIPS_DIR from frigate.events.types import EventStateEnum, EventTypeEnum from frigate.util.image import draw_box_with_label logger = logging.getLogger(__name__) class ManualEventState(str, Enum): complete = "complete" start = "start" end = "end" class ExternalEventProcessor: def __init__(self, config: FrigateConfig) -> None: self.config = config self.default_thumbnail = None self.event_sender = EventUpdatePublisher() self.detection_updater = DetectionPublisher(DetectionTypeEnum.api) self.event_camera = {} def create_manual_event( self, camera: str, label: str, source_type: str, sub_label: Optional[str], score: int, duration: Optional[int], include_recording: bool, draw: dict[str, any], snapshot_frame: any, ) -> str: now = datetime.datetime.now().timestamp() camera_config = self.config.cameras.get(camera) # create event id and start frame time rand_id = "".join(random.choices(string.ascii_lowercase + string.digits, k=6)) event_id = f"{now}-{rand_id}" thumbnail = self._write_images( camera_config, label, event_id, draw, snapshot_frame ) end = ( now + duration + camera_config.record.events.post_capture if duration is not None else None ) self.event_sender.publish( ( EventTypeEnum.api, EventStateEnum.start, camera, { "id": event_id, "label": label, "sub_label": sub_label, "score": score, "camera": camera, "start_time": now - camera_config.record.events.pre_capture, "end_time": end, "thumbnail": thumbnail, "has_clip": camera_config.record.enabled and include_recording, "has_snapshot": True, "type": source_type, }, ) ) if source_type == "api": self.event_camera[event_id] = camera self.detection_updater.publish( ( camera, now, { "state": ( ManualEventState.complete if end else ManualEventState.start ), "label": f"{label}: {sub_label}" if sub_label else label, "event_id": event_id, "end_time": end, }, ) ) return event_id def finish_manual_event(self, event_id: str, end_time: float) -> None: """Finish external event with indeterminate duration.""" self.event_sender.publish( ( EventTypeEnum.api, EventStateEnum.end, None, {"id": event_id, "end_time": end_time}, ) ) if event_id in self.event_camera: self.detection_updater.publish( ( self.event_camera[event_id], end_time, { "state": ManualEventState.end, "event_id": event_id, "end_time": end_time, }, ) ) self.event_camera.pop(event_id) def _write_images( self, camera_config: CameraConfig, label: str, event_id: str, draw: dict[str, any], img_frame: any, ) -> str: # write clean snapshot if enabled if camera_config.snapshots.clean_copy: ret, png = cv2.imencode(".png", img_frame) if ret: with open( os.path.join( CLIPS_DIR, f"{camera_config.name}-{event_id}-clean.png", ), "wb", ) as p: p.write(png.tobytes()) # write jpg snapshot with optional annotations if draw.get("boxes") and isinstance(draw.get("boxes"), list): for box in draw.get("boxes"): x = int(box["box"][0] * camera_config.detect.width) y = int(box["box"][1] * camera_config.detect.height) width = int(box["box"][2] * camera_config.detect.width) height = int(box["box"][3] * camera_config.detect.height) draw_box_with_label( img_frame, x, y, x + width, y + height, label, f"{box.get('score', '-')}% {int(width * height)}", thickness=2, color=box.get("color", (255, 0, 0)), ) ret, jpg = cv2.imencode(".jpg", img_frame) with open( os.path.join(CLIPS_DIR, f"{camera_config.name}-{event_id}.jpg"), "wb", ) as j: j.write(jpg.tobytes()) # create thumbnail with max height of 175 and save width = int(175 * img_frame.shape[1] / img_frame.shape[0]) thumb = cv2.resize(img_frame, dsize=(width, 175), interpolation=cv2.INTER_AREA) ret, jpg = cv2.imencode(".jpg", thumb) return base64.b64encode(jpg.tobytes()).decode("utf-8") def stop(self): self.event_sender.stop() self.detection_updater.stop()